AAO DRAMA/DRAMA2 C++ Interface
DRAMA C++11 and later interface
thread.hh
Go to the documentation of this file.
1#ifndef _DRAMA2_THREAD_INC
2#define _DRAMA2_THREAD_INC
3
13/*
14 * History:
15 04-Mar-2014 - TJF - Original version
16
17 * The above ID is for Doxygen, this one has the format ACMM is looking for.
18 * "@(#) $Id$"
19 */
20
22#include <thread>
23#include <queue>
24#include <deque>
25#include <map>
26#include <chrono>
27#include <future>
28#include <condition_variable>
29#include <csignal>
30namespace drama {
31
32 class Path; /* drama::Path */
33
37 namespace thread {
38
39 /* @internal
40 * Information needed by threads waiting on conditions.
41 *
42 * Note - for kick events, the arg will stay around until
43 * the next event on this thread. This is because we can have
44 * multiple threads waiting for a kick event and we must ensure
45 * any argument is valid for all. We use a shared_ptr type to
46 * manage this.
47 */
48 struct WaitEventData {
49 sds::IdPtr arg;
50 TransEvtInfo eventInfo;
51 };
52
53 /* @internal
54 * A queue of WaitEventData.
55 */
56 typedef std::queue<WaitEventData> WaitEventDataQueue;
57
58
59 /* @internal
60 * The type of a pointer to a condition variable
61 */
62 typedef std::shared_ptr<std::condition_variable_any>
63 WaitEventCondPtrType;
64
65 /* @internal
66 * Information on events we are expecting related to a particular
67 * transaction.
68 */
69 struct WaitEventDetails {
70 std::thread::id _thread; // Thread which is waiting
71 DitsTransIdType _transId; // Transaction we are waiting on, 0 for kick.
72 WaitEventDataQueue _dataQueue; // Queue of transaction data.
73 WaitEventCondPtrType _condition; // Used to notify the thread
74 // of events.
75 bool _waiting; // Are we waiting.
76
77 Path *_pathObj; // Path message is along. IF not kick
78 DitsPathType _ditsPath; // Underlying DITS path.
79 DitsMsgType _type; // Message type, if not kick.
80 std::string _messName; // Message Name, if not kick
81
82
83 explicit WaitEventDetails(
85 Path *thePath);
86 bool DataAvail() const {
87 return !_dataQueue.empty();
88 }
89 ~WaitEventDetails() {
90#if 0
92 "WaitEvenDetails destructor - transid %p, this %p\n",
93 static_cast<void *>(_transId),
94 static_cast<void *>(this));
95#endif
96 }
97
98 };
99
100 /*
101 * A map of wait event details, indexed by thread id.
102 */
103 typedef std::map<std::thread::id, WaitEventDetails> WaitEventMapType;
104 typedef WaitEventMapType::iterator WaitEventMapIter;
105 typedef WaitEventMapType::const_iterator WaitEventMapIterConst;
106
107 class TMessHandler;
108
109
110 /*
111 * Return the number of events outstanding across all threads.
112 */
113 unsigned WaitEventMapGetEvents(std::weak_ptr<Task> theTask,
114 const WaitEventMapType &waitEventMap);
120 class ProcessInfo {
121 private:
122 std::shared_ptr<TMessHandler> _handler;
123 std::shared_ptr<Path> _pathObj;
124 const DitsMsgType _type; // Message type, if not kick.
125 const std::string _messName; // Message Name, if not kick
126 public:
134 ProcessInfo(std::shared_ptr<TMessHandler> handler,
135 const WaitEventDetails &details) :
136 _handler(handler),
137 _pathObj(details._pathObj, drama::nodel()),
138 _type(details._type),
139 _messName(details._messName) {
140
141 }
150 const TMessHandler &GetHandler() const {
151 return *_handler;
152 }
157 const Path &GetPath() const {
158 return *_pathObj;
159 }
164 DitsMsgType GetMessType() const {
165 return _type;
166 }
174 const char * GetMessTypeCStr() const {
175 return DitsGetMsgTypeStr(_type);
176 }
181 const std::string &GetMessName() const {
182 return _messName;
183 }
185 }; // class ProcessInfo.
186
187 /*
188 * Abstract class for objects used to process transaction events.
189 */
190 class TransEvtProcessor {
191 public:
208 virtual bool Process(ProcessInfo messInfo,
209 const TransEvtInfo & eventInfo,
210 const sds::IdPtr &arg) = 0;
211
223 virtual void NewTransaction(DitsMsgType msgType, DitsTransIdType tid) = 0;
245 virtual bool WaitTimeout(std::chrono::steady_clock::time_point *until);
246
248 virtual ~TransEvtProcessor() {}
249 };
250
255 class TSdsListToUserObj : public sds::PrintObjectCR {
256 private:
257 TMessHandler *_handler;
258 public:
263 TSdsListToUserObj(TMessHandler *messageHandler);
268 void Print(const std::string &line) const override;
269 };
281 class TMessHandler {
282 TSdsListToUserObj _sdsListToUserObj = this;
283 public:
284
287 TMessHandler& operator=(const TMessHandler &rhs) = delete;
288
291 TMessHandler(const TMessHandler &source) = delete;
292
293 TMessHandler() {}
294
300 virtual Task::mutexType & Lock() const = 0;
324 drama::Path *pathObj ) = 0;
338 virtual void MessageUser(const std::string &text) const = 0;
339
340
362 template<typename... Types>
363 void MessageUser(const char *format, Types... args) {
364
365 /*
366 * Our approach is to write the output to a string, via
367 * SafePrintf(), then output that in one operation. Since
368 * std::ostream devices will then do the output in one operation.
369 */
370 std::stringstream sstrm;
372 MessageUser(sstrm.str());
373
374
375 }
376
377
400 virtual void WaitForTransaction(
401 std::weak_ptr<Task> theTask,
402 TransEvtInfo * const eventInfo,
403 TransEvtProcessor * const eventProcessor,
404 sds::IdPtr * const arg = nullptr) ;
405
406
429 virtual bool WaitForTransactionUntil(
430 std::weak_ptr<Task> theTask,
431 TransEvtInfo * const eventInfo,
432 TransEvtProcessor * const eventProcessor,
433 const std::chrono::steady_clock::time_point &until,
434 sds::IdPtr * const arg = nullptr);
435
436
437
444 virtual Dits___CurActType GetMessageContext() const = 0;
445
450 virtual std::shared_ptr<Task> GetTask() const = 0;
457 virtual const sds::PrintObjectCR &SdsListToUser() const;
458
461 virtual ~TMessHandler() {}
462
463
464 private:
465 /* Features used by WaitForTransaction()
466 */
467
476 virtual void ClearWait(bool complete) = 0;
477 /*
478 * Return a pointer to the event details for the specified thread.
479 */
480 virtual WaitEventDetails *FindWaitEventDetails(std::thread::id) = 0;
481
482 /*
483 *
484 * Initiate waiting for a transaction and return a pointer
485 * to the item the wait event details. Presumption is that
486 * SetupWaitEvent() has been invoked.
487 */
488 WaitEventDetails * WaitForTransactionStart();
489 /*
490 * The wait for transaction operation has finished - tidy up.
491 *
492 * @param Event details returned by WaitForTransactionStart()
493 * @param evenInfo Details on the event which caused the
494 * wait to finish are returned here.
495 * @param arg The output argument from the transaction, if any.
496 * An address of and sds::IdPtr is supplied.
497 * If you supply a null pointer, it is
498 * ignored. Otherwise, any argument to the
499 * transaction event is copied into here.
500 * If there is no argument to the
501 * transaction it will refer to a null
502 * SDS item.
503 */
504 void WaitForTransactionFinish(
505 const WaitEventData &details,
506 TransEvtInfo * const eventInfo,
507 sds::IdPtr * const arg);
508
509
510 }; // class TMessHandler
511
535 class AccessDrama {
536 Dits___CurActType _entryDetails;
537 Task::guardType _lock;
538 public:
551 ~AccessDrama();
552 };
553
559 class SignalBlocker {
560 sigset_t oldSet;
561 public:
569 };
571 /*
572 * Block all signals to the current thread.
573 */
574 extern void BlockSignals();
575
576
577 /* @internal
578 *
579 * Puts an entry in waitEventMap for this thread and the specified
580 * transaction id. If the transaction id is 0, then it is presumed
581 * the entry is for a kick.
582 *
583 * Thread/Lock Entry Info:
584 * May/may not be running in DRAMA message thread. (Probably not)
585 * DRAMA Lock Taken.
586 *
587 */
588 void SetupWaitEvent(DitsTransIdType id,
590 TMessHandler *h,
591 WaitEventMapType *waitEventMap);
592
593
594
680 const std::string &fileName,
681 const std::string &arguments="",
682 const std::string &node="",
683 int priority = 0,
684 bool absPriority = false,
685 bool setNames = true,
686 unsigned stackBytes = 0);
687
786 bool RunProgramWaitUntil(std::chrono::steady_clock::time_point until,
788 const std::string &fileName,
789 const std::string &arguments="",
790 const std::string &node="",
791 int priority = 0,
792 bool absPriority = false,
793 bool setNames = true,
794 unsigned stackBytes = 0);
795
796
797
798
799 } // namespace thread
800} // namespace drama
801
802#endif
A Class which provides access to DRAMA's message sending facilities.
Definition path.hh:689
std::recursive_timed_mutex mutexType
Defines the type of our mutex.
Definition task.hh:455
std::lock_guard< mutexType > guardType
Defines the type of a lock guard using our mutex type.
Definition task.hh:460
Abstract class which is sub-classed to print SDS item listings.
Definition sds.hh:310
AccessDrama(const TMessHandler &messHandler)
Access the DRAMA context of a particular message handler object.
AccessDrama(Task::mutexType &taskLock)
Takes the DRAMA lock and saves the current context.
A class used by threads to access and enable the DRAMA context of an action or of a particular UFACE ...
Definition thread.hh:562
const Path & GetPath() const
Return the Path along which the message was sent.
Definition thread.hh:184
const std::string & GetMessName() const
Return the message name (Action name etc).
Definition thread.hh:208
DitsMsgType GetMessType() const
Return the type of the message that was sent.
Definition thread.hh:191
ProcessInfo(std::shared_ptr< TMessHandler > handler, const WaitEventDetails &details)
Construct one of these objects.
Definition thread.hh:161
const TMessHandler & GetHandler() const
Return the DRAMA Message handler object that was used to send the message.
Definition thread.hh:177
const char * GetMessTypeCStr() const
Return the type of the message that was sent as a C string.
Definition thread.hh:201
A class with information used by the TransEvtProcessor methods.
Definition thread.hh:147
SignalBlocker()
Constructor - Block signals in the current thread.
~SignalBlocker()
Destructor - read the signal mask settings to that when the constructor was run.
Constructing an object of this type will block all blockable UNIX signals in the current thread.
Definition thread.hh:586
void MessageUser(const char *format, Types... args)
Use DRAMA to send a message to the user - safe format.
Definition thread.hh:390
virtual ~TMessHandler()
Destructor.
Definition thread.hh:488
virtual Dits___CurActType GetMessageContext() const =0
Get the DRAMA Context associated with the action/UFACE event.
virtual void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj)=0
Sets up a wait event for this thread.
TMessHandler(const TMessHandler &source)=delete
Copy constructor - deleted.
virtual bool WaitForTransactionUntil(std::weak_ptr< Task > theTask, TransEvtInfo *const eventInfo, TransEvtProcessor *const eventProcessor, const std::chrono::steady_clock::time_point &until, sds::IdPtr *const arg=nullptr)
Block the current thread until a message for the transaction specified to SetupWaitEvent() occurs or ...
TMessHandler & operator=(const TMessHandler &rhs)=delete
Assignment operator - deleted.
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
virtual void WaitForTransaction(std::weak_ptr< Task > theTask, TransEvtInfo *const eventInfo, TransEvtProcessor *const eventProcessor, sds::IdPtr *const arg=nullptr)
Block the current thread until a message for the transaction specified to SetupWaitEvent() occurs.
virtual std::shared_ptr< Task > GetTask() const =0
Get a reference to the DRAMA task we are part of.
virtual void MessageUser(const std::string &text) const =0
Use DRAMA to send a message to the user.
virtual Task::mutexType & Lock() const =0
Reference the DRAMA Task lock.
This interface class must be implemented by classes which have threads waiting for messages.
Definition thread.hh:308
void Print(const std::string &line) const override
Prints one line of an SDS listing.
TSdsListToUserObj(TMessHandler *messageHandler)
Constructor.
Object used to print SDS objects using MessageUser from TMessHandler objects.
Definition thread.hh:282
DRAMA 2 include file - Message Handler class definition.
std::shared_ptr< Id > IdPtr
A shared pointer for sds::Id items.
Definition sds.hh:3318
bool RunProgramWaitUntil(std::chrono::steady_clock::time_point until, TMessHandler *messHandler, const std::string &fileName, const std::string &arguments="", const std::string &node="", int priority=0, bool absPriority=false, bool setNames=true, unsigned stackBytes=0)
Load and run a program using the DRAMA loading mechanisms.
void RunProgram(TMessHandler *messHandler, const std::string &fileName, const std::string &arguments="", const std::string &node="", int priority=0, bool absPriority=false, bool setNames=true, unsigned stackBytes=0)
Load and run a program using the DRAMA loading mechanisms.
void CreateRunDramaTask()
Create and run a DRAMA task, with standard exception handling.
Definition task.hh:1322
void SafePrintf(std::ostream &ostream, const char *str)
Safe formatted write to a stream.
The drama namespace contains all the classes, types etc of the DRAMA 2 implementation.
Definition drama.hh:93
Structure is used to store details about a DRAMA reschedule message relating to a transaction,...
Definition task.hh:128
Declare an operator to be used as a deletion operator by std::shared_ptr.
Definition task.hh:114