AAO DRAMA/DRAMA2 C++ Interface
DRAMA C++11 and later interface
path.hh
Go to the documentation of this file.
1#ifndef _DRAMA2_PATH_INC
2#define _DRAMA2_PATH_INC
20/*
21 * History:
22 06-Mar-2014 - TJF - Original version. Starting from dcpppath.h
23 30-Sep-2016 - TJF - Start of history after development.
24 Add MonForwardEventHandler class used as event
25 handler for monitors, main point is the triggers
26 don't cause exceptions.
27 Don't throw exceptions when messages return bad
28 status, the event handlers can do that, otherwise
29 event handlers cannot stop such errors occurring.
30 Add Two new overrides of MonitorForward().
31 30-Sep-2016 - TJF - Add SpawnKickArg() and SpawnKickArgUpdate() static
32 methods.
33
34
35
36 * The above ID is for Doxygen, this one has the format ACMM is looking for.
37 * "@(#) $Id$"
38 */
39#include "drama.hh"
40#include "DitsInteraction.h"
41#include "DitsFix.h"
42#include "DitsBulk.h"
43#include <drama/thread.hh>
44#include <set>
45#include <atomic>
46
47#include <initializer_list>
48
49namespace drama {
50
67 inline std::chrono::steady_clock::time_point CreateFutureTimepoint(double secs)
68 {
69 // Get now with the steady clock.
70 std::chrono::steady_clock::time_point until =
71 std::chrono::steady_clock::now();
72
73 // Duration with seconds in double.
74 std::chrono::duration<double> myDuration(secs);
75
76 /*
77 * When adding - we need the duration_cast as double has a higher
78 * resolution then steady_clock, since the later uses an integral type.
79 *
80 * But we get the highest resolution result we can wait on.
81 */
82 until += std::chrono::duration_cast<std::chrono::steady_clock::duration>(myDuration);
83
84 return until;
85
86 }
93 class Buffers {
94 private:
95 DitsPathInfoType _bufInfo;
96 public:
108 Buffers( long int MessageBytes = 800,
109 long int MaxMessages = 2,
110 long int ReplyBytes = 800,
111 long int MaxReplies = 10) {
112 _bufInfo.MessageBytes = MessageBytes;
113 _bufInfo.MaxMessages = MaxMessages;
114 _bufInfo.ReplyBytes = ReplyBytes;
115 _bufInfo.MaxReplies = MaxReplies;
116 }
123 Buffers ( const DitsPathInfoType & info) : _bufInfo(info) {};
126 operator const DitsPathInfoType * () const {
127 return &_bufInfo;
128 }
133 long int MessageBytes() const { return _bufInfo.MessageBytes; }
138 long int MaxMessages() const { return _bufInfo.MaxMessages; }
143 long int ReplyBytes() const { return _bufInfo.ReplyBytes; }
148 long int MaxReplies() const { return _bufInfo.MaxReplies; };
149 }; // class Buffers.
153 typedef std::vector<ErsMessageType> ErsMessageVector;
154
155
156
167 class MessageEventHandler : public thread::TransEvtProcessor {
168 private:
169 /*
170 * Converts argument to a MsgOut() message into call to MessageUser().
171 */
172 void MessageUserRaw(thread::ProcessInfo messInfo, const sds::IdPtr &arg);
173
174 /*
175 * Coverts argument to a Ers() message and calls ErrorReport().
176 */
177 virtual void ErrorReportRaw(thread::ProcessInfo messInfo,
178 const sds::IdPtr &arg);
179
181 public:
196 const TransEvtInfo & eventInfo,
197 const sds::IdPtr &arg) override final;
198
215 virtual void NewTransaction(DitsMsgType msgType, DitsTransIdType tid) override;
216
217
218
253 StatusType status, const sds::IdPtr &arg);
254
255
270 const sds::IdPtr &arg);
271
272
286
303 StatusType status, const sds::IdPtr &arg);
304
314
332 const std::string &task,
333 const std::string &message);
334
353 const std::string &task,
355
375 const sds::IdPtr &arg);
376
377
398 DitsBulkInfoType bulkInfo);
399
414
415 };
425 class MonForwardEventHandler : public MessageEventHandler {
426 public:
441 virtual void TriggerReceived(thread::ProcessInfo messInfo,
443 const sds::IdPtr &arg) override;
444
445 };
446
449 class GetPathEventHandler : public thread::TransEvtProcessor {
450 public:
470 virtual bool Process(thread::ProcessInfo messInfo,
471 const TransEvtInfo & eventInfo,
472 const sds::IdPtr &arg) override;
473
485 virtual void NewTransaction(DitsMsgType msgType, DitsTransIdType tid) override;
486 };
487
502 class IsRunningType : public drama::MessageEventHandler {
503 public:
507 enum class EventStatus {
508 NotStarted,
509 Running,
510 Failed,
511 Completed
512 };
517 EventStatus Get() const { return _status; };
518
524 std::string GetStr(EventStatus val) const;
525
530 std::string GetStr() const { return GetStr(_status); }
531
532
537 bool operator()() const {
538 if (_status == EventStatus::Running)
539 {
540 //std::cerr << "EventStatus true" << std::endl;
541 return true;
542 }
543 else
545 //std::cerr << "EventStatus false" << std::endl;
546 return false;
547 }
548 }
550 IsRunningType() {
552 }
553 protected:
562 _status = newVal;
563 }
565
566 private:
567 /* The variable we maintain. Use atomic since this intended for
568 * read/write from one thread, read from another
569 */
570 std::atomic<EventStatus> _status;
571
572 protected:
581
582 /* We won't override ThreadWaitAbort, what to do is unclear as the
583 * transaction remains running */
584
599 StatusType status) override;
619 const drama::sds::IdPtr &arg) override;
631 StatusType status) override;
632
633 }; // Class IsRunningType.
634
635
662 class Path {
663
664 private:
665
666 /*
667 * We need a static list of Path objects, so that we can handle
668 * DITS Disconnected events and translate them to Path objects to
669 * be notified.
670 *
671 * Note - I traditionally use a list here, but in fact, a std::set
672 * makes more sense. We define the type here, but use a
673 * static function to construct and access the object to avoid
674 * static initialization order issues.
675 */
676 typedef std::set<Path *> ObjectListType;
677 static ObjectListType &ObjectList();
678
679
680 /* Indicates if the disconnect handler has been enabled.*/
681 static bool _DisconnectEnabled;
682
683 /*
684 * The disconnect handler details when our disconnect handler
685 * was enabled.
686 */
687 static DitsDisConnectRoutineType _OrigDisconnect;
688 static DVOIDP _OrigDisconnectData;
690 /*
691 * Basic message even handler.
692 */
693 static MessageEventHandler _simpleEventProcessor;
694 static GetPathEventHandler _getPathEventProcessor;
695 static MonForwardEventHandler _simpleMonEventProcessor;
696 /*
697 * Pointer to the DRAMA task we are part of.
698 * (Note, in other examples, we make the pointer const, but
699 * if we do that here, we can't use the move assignment operator)
700 */
701 std::weak_ptr<Task> _theTask;
702
703
704 /* The path to the task, if state != INITIAL */
705 DitsPathType _path = nullptr;
706 /* The state of this task object (NOTIFY not yet supported */
707 enum class PathState { INITIAL, SELFINIT, PATHWAIT, LOADING,
708 ACTIVE, DIED, NOTIFY }
709 _state = PathState::INITIAL;
710 /* Buffer sizes */
711 Buffers _buffers;
712 /* Flags to the PathGet operation */
713 int _pathFlags = 0;
714 /* Task priority */
715 int _priority = 0;
716 /* Flags to the load command */
717 int _loadFlags = 0;
718 /* Set true to indicate the last GetPath loaded this task */
719 bool _loaded = false;
720 /* Do we log loads */
721 bool _logLoad = true;
722 /* The name of the task */
723 std::string _taskName;
724 /* The location of the task */
725 std::string _taskLocation;
726 /* The file to load the task from */
727 std::string _taskFile;
728 /* The requested process name */
729 std::string _process;
730 /* Argument to the load command */
731 std::string _argument;
732 /* Decode string */
733 std::string _decode;
734
735 unsigned _stackSize = 0;
736
737 /*
738 * Various internal routines.
739 */
740 /* Enable the disconnect handler, if not already enabled
741 * Can only be called if DitsAppInit() has been run, hence
742 * not from normally a constructor.
743 */
744 void EnableDisconnect();
745
746 /*
747 * Invoked by constructors to do common stuff.
748 */
749 void ConstructorHelper();
750
751
752 public: /* These two can be public */
753 /*
754 * Return a string (char *) version of a state enum. Used in
755 * logging and error messages.
756 */
757 const char *StateString(PathState state) const;
758 /*
759 * Return a string (char *) version of the current state. Used in
760 * logging and error messages.
761 */
762 const char *StateString() const {
763 return StateString(_state);
764 }
765 private:
766
767 /*
768 * Start the GetPath operation, returning the transaction id.
769 */
770 DitsTransIdType GetPathStart(thread::TMessHandler *action);
771
772 /*
773 * Load the task.
774 */
775 DitsTransIdType LoadTask(thread::TMessHandler *action);
776 /*
777 * A load (started by GetPathStart() has completed, process it
778 * and set the GetPath operation
779 */
780 DitsTransIdType LoadCompleteGetPath(
781 thread::TMessHandler *action,
783 const TransEvtInfo &eventInfo); // Transaction event details.
784
785 /*
786 * Complete the GetPath operation.
787 */
788 void GetPathComplete(
789 const TransEvtInfo &eventInfo); // Transaction event details.
790
791 /*
792 * Underlying send routine used by the various methods which send
793 * a DRAMA message.
794 *
795 * @param action A pointer to the threaded action which is
796 * executing this operation.
797 * @param type Message type.
798 * @param name Name of the action to start.
799 * @param argIn Argument to the action.
800 * @param eventProcessor An object to use to process events.
801 * All Send() does is to invoke the NewTransaction()
802 * method when it has sent the message.
803 *
804 * @return The DRAMA transaction ID of the message.
805 */
806 DitsTransIdType Send(
807 thread::TMessHandler *action,
809 const std::string & name,
810 int flags,
811 const sds::Id &arg,
812 MessageEventHandler * const eventProcessor);
813
814
815 /*
816 * Underlying send bulk routine used by the various methods which send
817 * a DRAMA message.
818 *
819 * @param action A pointer to the threaded action which is
820 * executing this operation.
821 * @param type Message type.
822 * @param name Name of the action to start.
823 * @param argIn The Bulk Data segment to be the argument to the
824 * obey message.
825 * @param isSds Does arg contain an SDS item? If true,
826 * the target task can access the item as an
827 * argument using using SDS in the normal way
828 * If false, the shared memory contains data in an
829 * application private format and DRAMA makes no attempt
830 * to interpret it.
831 * @param notifyBytes If non-zero, it indicates your action should
832 * be notified (using EntryCode::BulkTransferred
833 * entries) every time the specified number
834 * of bytes are transferred. Note the this effect
835 * is somewhat dependent on the target task
836 * behavior and you may only receive the final
837 * EntryCode::BulkDone message.
838 * @param eventProcessor An object to use to process events.
839 * All Send() does is to invoke the NewTransaction()
840 * method when it has sent the message.
841 *
842 * @return The DRAMA transaction ID of the message.
843 */
844 DitsTransIdType SendBulk(
845 thread::TMessHandler *action,
847 const std::string & name,
848 int flags,
849 BulkData *argIn,
850 bool isSds,
851 unsigned notifyBytes,
852 MessageEventHandler * const eventProcessor);
853
854
861 Path& operator=(const Path &rhs) = delete;
867 Path(const Path &source) = delete;
868
869 protected:
877 virtual void Disconnected();
878
879
880
881 public:
887 Path& operator=(Path &&rhs) = default;
892 Path(Path &&source) = default;
893 /* Normal constructor
894 *
895 * Allows the the task name, host and file to be set
896 *
897 * @note DRAMA does not directly deal with long program names (value of
898 * the file argument). The limit is about 63 characters. The work
899 * around for this is to use environment variables known to IMP_Master
900 * on the machine doing the loading (node). The normal approach is to
901 * use the ENVVAR:program approach to specifying the node name. E.g.
902 * rather then specifying a filename like this:
903 * <pre>
904 * /directory1/diretory2/directory3/directory4/program
905 * </pre>
906 *
907 * There would be an environment variable know to the IMP_Master
908 * program defined like this:
909 * <pre>
910 * export ENVVAR=/directory1/diretory2/directory3/directory4/
911 * </pre>
912 * And then the ENVVAR:program name will work. Remember that
913 * IMP_Master is normally started using dits_netstart.
915 *
916 * @param theTask The DRAMA Task we are part of. This is passed
917 * by pointer, but Path is not taking ownership of
918 * the subject object, which must continue to exist
919 * until the Path object is destroyed.
920 * @param name The name the task will be known as (unless loaded,
921 * when the name the task registers as will be used)
922 * @param host The node name on which the task is running or
923 * is to be loaded. Only used if not already running
924 * locally or known locally.
925 * @param file The file of the task, as required by
926 * DCF(DitsLoad). Normally a file name, but may
927 * be different depending on the location, e.g.
928 * on a VxWorks machine.
929 */
930 Path(std::weak_ptr<Task> theTask,
931 const std::string &name,
932 const std::string &host = "",
933 const std::string &file = "");
934
945 explicit Path(std::weak_ptr<Task> theTask);
946
947
962 Path(std::weak_ptr<Task> theTask, DitsPathType path);
963
969 Path() { }
970
973 virtual ~Path();
983 /*
984 * Inquiries
985 */
990 bool GetPathLoaded() const { return (_loaded); };
999 const std::string GetTaskName() const;
1009 const std::string &GetHost() const { return (_taskLocation); }
1014 const std::string &GetArgument() const { return (_argument); };
1015
1016 /*
1017 * Routines to set various load and get path options.
1018 */
1032 virtual void SetName(const std::string &TaskName);
1040 virtual void SetHost(const std::string & Host) {
1041 _taskLocation = Host;
1042 }
1067 virtual void SetFile(const std::string &File) {
1068 _taskFile = File;
1069 }
1078 virtual void SetBuffers(const Buffers & buffs) {
1079 this->_buffers = buffs;
1080 }
1086 virtual void SetFlowControl() {
1087 _pathFlags |= DITS_M_FLOW_CONTROL;
1088 }
1100 virtual void SetProcess(const std::string &ProcessName);
1101
1110 virtual void SetStackSize(unsigned bytes);
1111
1122 virtual void SetArgument(const std::string &LoadArg,
1123 bool const Append=false);
1135 virtual void SetPriority(const int Priority,
1136 const bool Absolute = false);
1154 virtual void SetNames(const bool Flag = true) {
1155 if (Flag)
1156 _loadFlags |= DITS_M_NAMES;
1157 else
1158 _loadFlags &= ~DITS_M_NAMES;
1159 }
1172 virtual void SetSymbols(const bool Flag = true) {
1173 Task::guardType DramaLock(std::shared_ptr<Task>(_theTask)->Lock()); // Grab DRAMA Lock if we don't have it.
1174 if (Flag)
1175 _loadFlags |= DITS_M_SYMBOL;
1176 else
1177 _loadFlags &= ~DITS_M_SYMBOL;
1178 };
1195 virtual void SetProg(const bool Flag = true) {
1196 Task::guardType DramaLock(std::shared_ptr<Task>(_theTask)->Lock()); // Grab DRAMA Lock if we don't have it.
1197 if (Flag)
1198 _loadFlags |= DITS_M_PROG;
1199 else
1200 _loadFlags &= ~DITS_M_PROG;
1201 };
1209 virtual void LogLoad(const bool Flag = true) {
1210 _logLoad = Flag;
1211 }
1212
1218 virtual void ClearState() {
1219 _state = PathState::INITIAL;
1220 };
1227 virtual bool Active() const {
1228 return ((_state == PathState::ACTIVE)||
1229 (_state == PathState::SELFINIT));
1230 }
1235 virtual bool IsDead() const {
1236 return (_state == PathState::DIED);
1237 }
1242 virtual bool Initial() const {
1243 return ((_state == PathState::INITIAL)||
1244 (_state == PathState::DIED));
1253 virtual bool Busy() const {
1254 return ((_state == PathState::PATHWAIT) ||
1255 (_state == PathState::LOADING) ||
1256 (_state == PathState::NOTIFY));
1257 }
1258
1263 virtual std::shared_ptr<Task> GetTask() const {
1264 return std::shared_ptr<Task>(_theTask);
1265 }
1266
1288 void GetPath(thread::TMessHandler *action,
1289 thread::TransEvtProcessor * const eventProcessor = &_getPathEventProcessor );
1324 bool GetPathWaitUntil(std::chrono::steady_clock::time_point until,
1326 thread::TransEvtProcessor * const eventProcessor = &_getPathEventProcessor );
1327
1346 const std::string &name,
1348 sds::IdPtr * const argOut = nullptr,
1349 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1350
1351#if 0
1352 fprintf(stderr,"!!!OBey - sending action %s\n", name.c_str());
1353#endif
1354
1355 Send(action, DITS_MSG_OBEY, name, 0 /* Flags */, argIn, eventProcessor);
1356#if 0
1357 fprintf(stderr,"!!!Obey sent, waiting for reply\n");
1358#endif
1359 TransEvtInfo eventInfo;
1360 action->WaitForTransaction(_theTask,
1361 &eventInfo, eventProcessor, argOut);
1362#if 0
1363 fprintf(stderr,"!!!Obey sent, wait complete\n");
1364#endif
1365/* This should not be needed, event processor will throw if needed */
1366/* if (eventInfo.entryStatus != STATUS__OK)
1367 {
1368 DramaTHROW_S(
1369 eventInfo.entryStatus,
1370 "Obey of message % to task % completed with error",
1371 name, _taskName);
1373 else */ if (eventInfo.complete == 0)
1374 {
1376 eventInfo.entryStatus,
1377 "Obey of message % to task %, returned but not complete",
1378 name, _taskName);
1379 }
1380
1381 }
1382
1392 static std::chrono::steady_clock::time_point DeltaToTimePoint(int secs) {
1393 return std::chrono::steady_clock::now() + std::chrono::seconds(secs);
1394 }
1395
1426 bool ObeyWaitUntil(std::chrono::steady_clock::time_point until,
1427 thread::TMessHandler *action,
1428 const std::string &name,
1429 const sds::Id &argIn = sds::Id::CreateNullItem(),
1430 sds::IdPtr * const argOut = nullptr,
1431 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1432
1433 Send(action, DITS_MSG_OBEY, name, 0 /* Flags */, argIn, eventProcessor);
1434
1435 TransEvtInfo eventInfo;
1436 bool ok = action->WaitForTransactionUntil(
1437 _theTask,
1438 &eventInfo, eventProcessor,
1439 until, argOut);
1440
1441 if (!ok) return false;
1442
1443/* This should not be needed, event processor will throw if needed */
1444/* if (eventInfo.entryStatus != STATUS__OK)
1445 {
1446 DramaTHROW_S(
1447 eventInfo.entryStatus,
1448 "Obey of action % to task % completed with error",
1449 name, _taskName);
1450 }
1451 else */ if (eventInfo.complete == 0)
1452 {
1454 eventInfo.entryStatus,
1455 "Obey of action % to task %, returned but not complete",
1456 name, _taskName);
1457 }
1458 return true;
1459
1460 }
1461
1493 void ObeyBulk(thread::TMessHandler *action,
1494 const std::string &name,
1495 BulkData *argIn,
1496 bool isSds,
1497 unsigned notifyBytes=1024*1024,
1498 sds::IdPtr * const argOut = nullptr,
1499 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1500
1501#if 0
1502 fprintf(stderr,"!!!OBeyBulk - sending action %s\n", name.c_str());
1503#endif
1504
1505 SendBulk(action, DITS_MSG_OBEY, name, 0 /* Flags */, argIn, isSds,
1507#if 0
1508 fprintf(stderr,"!!!ObeyBulk sent, waiting for reply\n");
1509#endif
1510 TransEvtInfo eventInfo;
1511 action->WaitForTransaction(_theTask,
1512 &eventInfo, eventProcessor, argOut);
1513#if 0
1514 fprintf(stderr,"!!!ObeyBulk sent, wait complete\n");
1515#endif
1516/* This should not be needed, event processor will throw if needed */
1517/* if (eventInfo.entryStatus != STATUS__OK)
1518 {
1519 DramaTHROW_S(
1520 eventInfo.entryStatus,
1521 "ObeyBulk of message % to task % completed with error",
1522 name, _taskName);
1523 }
1524 else */ if (eventInfo.complete == 0)
1525 {
1527 eventInfo.entryStatus,
1528 "ObeyBulk of message % to task %, returned but not complete",
1529 name, _taskName);
1530 }
1531
1532 }
1533
1534
1560 void ObeyBulk(thread::TMessHandler *action,
1561 const std::string &name,
1562 BulkDataSds *argIn,
1563 unsigned notifyBytes=1024*1024,
1564 sds::IdPtr * const argOut = nullptr,
1565 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1566
1567
1569 }
1570
1571
1572
1619 bool ObeyBulkWaitUntil(std::chrono::steady_clock::time_point until,
1620 thread::TMessHandler *action,
1621 const std::string &name,
1622 BulkData *argIn,
1623 bool isSds,
1624 unsigned notifyBytes=1024*1024,
1625 sds::IdPtr * const argOut = nullptr,
1626 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1627
1628 SendBulk(action, DITS_MSG_OBEY, name, 0 /* Flags */, argIn,
1630
1631 TransEvtInfo eventInfo;
1632 bool ok = action->WaitForTransactionUntil(
1633 _theTask,
1634 &eventInfo, eventProcessor,
1635 until, argOut);
1636
1637 if (!ok) return false;
1638
1639/* This should not be needed, event processor will throw if needed */
1640/* if (eventInfo.entryStatus != STATUS__OK)
1641 {
1642 DramaTHROW_S(
1643 eventInfo.entryStatus,
1644 "Obey of action % to task % completed with error",
1645 name, _taskName);
1647 else*/ if (eventInfo.complete == 0)
1648 {
1650 eventInfo.entryStatus,
1651 "Obey of action % to task %, returned but not complete",
1652 name, _taskName);
1653 }
1654 return true;
1655
1656 }
1657
1658
1699 bool ObeyBulkWaitUntil(std::chrono::steady_clock::time_point until,
1700 thread::TMessHandler *action,
1701 const std::string &name,
1702 BulkDataSds *argIn,
1703 unsigned notifyBytes=1024*1024,
1704 sds::IdPtr * const argOut = nullptr,
1705 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1706
1707
1708 return ObeyBulkWaitUntil(until, action, name, argIn, true,
1710
1711 }
1730 void Kick(thread::TMessHandler *action,
1731 const std::string &name,
1732 const sds::Id &argIn = sds::Id::CreateNullItem(),
1733 sds::IdPtr * const argOut = nullptr,
1734 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1735
1736 Send(action, DITS_MSG_KICK, name, 0 /* Flags */, argIn, eventProcessor);
1737
1738 TransEvtInfo eventInfo;
1739 action->WaitForTransaction(_theTask,
1740 &eventInfo, eventProcessor,
1741 argOut);
1742/* This should not be needed, event processor will throw if needed */
1743/* if (eventInfo.entryStatus != STATUS__OK)
1744 {
1745 DramaTHROW_S(
1746 eventInfo.entryStatus,
1747 "Kick of action % in task % completed with error",
1748 name, _taskName);
1749 }
1750 else */ if (eventInfo.complete == 0)
1751 {
1753 eventInfo.entryStatus,
1754 "Kick of action % in task %, returned but not complete",
1755 name, _taskName);
1756 }
1758 }
1759
1790 bool KickWaitUntil(std::chrono::steady_clock::time_point until,
1792 const std::string &name,
1794 sds::IdPtr * const argOut = nullptr,
1795 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1796
1797 Send(action, DITS_MSG_KICK, name, 0 /* Flags */, argIn, eventProcessor);
1798
1799 TransEvtInfo eventInfo;
1800 bool ok = action->WaitForTransactionUntil(
1801 _theTask,
1802 &eventInfo, eventProcessor,
1803 until, argOut);
1804
1805 if (!ok) return false;
1806
1807/* This should not be needed, event processor will throw if needed */
1808/* if (eventInfo.entryStatus != STATUS__OK)
1809 {
1810 DramaTHROW_S(
1811 eventInfo.entryStatus,
1812 "Kick of action % in task % completed with error",
1813 name, _taskName);
1814 }
1815 else */ if (eventInfo.complete == 0)
1816 {
1818 eventInfo.entryStatus,
1819 "Kick of action % in task %, returned but not complete",
1820 name, _taskName);
1821 }
1822 return true;
1823
1824 }
1825
1858 void KickBulk(thread::TMessHandler *action,
1859 const std::string &name,
1860 BulkData *argIn,
1861 bool isSds,
1862 unsigned notifyBytes=1024*1024,
1863 sds::IdPtr * const argOut = nullptr,
1864 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1865
1866 SendBulk(action, DITS_MSG_KICK, name, 0 /* Flags */, argIn, isSds,
1868
1869 TransEvtInfo eventInfo;
1870 action->WaitForTransaction(_theTask,
1871 &eventInfo, eventProcessor,
1872 argOut);
1873/* This should not be needed, event processor will throw if needed */
1874/* if (eventInfo.entryStatus != STATUS__OK)
1875 {
1876 DramaTHROW_S(
1877 eventInfo.entryStatus,
1878 "Kick of action % in task % completed with error",
1879 name, _taskName);
1880 }
1881 else */ if (eventInfo.complete == 0)
1882 {
1884 eventInfo.entryStatus,
1885 "Kick of action % in task %, returned but not complete",
1886 name, _taskName);
1887 }
1888
1889 }
1890
1917 void KickBulk(thread::TMessHandler *action,
1918 const std::string &name,
1919 BulkDataSds *argIn,
1920 unsigned notifyBytes=1024*1024,
1921 sds::IdPtr * const argOut = nullptr,
1922 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1923
1925 }
1926
1927
1928
1973 bool KickBulkWaitUntil(std::chrono::steady_clock::time_point until,
1974 thread::TMessHandler *action,
1975 const std::string &name,
1976 BulkData *argIn,
1977 bool isSds,
1978 unsigned notifyBytes=1024*1024,
1979 sds::IdPtr * const argOut = nullptr,
1980 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
1981
1982 SendBulk(action, DITS_MSG_KICK, name, 0 /* Flags */, argIn, isSds,
1984
1985 TransEvtInfo eventInfo;
1986 bool ok = action->WaitForTransactionUntil(
1987 _theTask,
1988 &eventInfo, eventProcessor,
1989 until, argOut);
1990
1991 if (!ok) return false;
1992
1993/* This should not be needed, event processor will throw if needed */
1994/* if (eventInfo.entryStatus != STATUS__OK)
1995 {
1996 DramaTHROW_S(
1997 eventInfo.entryStatus,
1998 "Kick of action % in task % completed with error",
1999 name, _taskName);
2001 else */ if (eventInfo.complete == 0)
2002 {
2004 eventInfo.entryStatus,
2005 "Kick of action % in task %, returned but not complete",
2006 name, _taskName);
2007 }
2008 return true;
2009
2010 }
2011
2012
2051 bool KickBulkWaitUntil(std::chrono::steady_clock::time_point until,
2052 thread::TMessHandler *action,
2053 const std::string &name,
2054 BulkDataSds *argIn,
2055 unsigned notifyBytes=1024*1024,
2056 sds::IdPtr * const argOut = nullptr,
2057 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2058 return KickBulkWaitUntil(until, action, name, argIn,
2060 }
2061
2062
2075 void SetParam(thread::TMessHandler *action,
2076 const std::string &name,
2077 const sds::Id &argIn,
2078 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2079
2080 Send(action, DITS_MSG_SETPARAM, name, 0 /* Flags */, argIn, eventProcessor);
2081
2082 TransEvtInfo eventInfo;
2083 action->WaitForTransaction(_theTask,
2084 &eventInfo, eventProcessor);
2085/* This should not be needed, event processor will throw if needed */
2086/* if (eventInfo.entryStatus != STATUS__OK)
2087 {
2088 DramaTHROW_S(
2089 eventInfo.entryStatus,
2090 "Set of parameter % in task % completed with error",
2091 name, _taskName);
2092 }
2093 else */ if (eventInfo.complete == 0)
2094 {
2096 eventInfo.entryStatus,
2097 "Set of parameter % in task %, returned but not complete",
2098 name, _taskName);
2099 }
2100
2101 }
2127 bool SetParamWaitUntil(std::chrono::steady_clock::time_point until,
2129 const std::string &name,
2130 const sds::Id &argIn,
2131 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2132
2133 Send(action, DITS_MSG_SETPARAM, name, 0 /* Flags */, argIn, eventProcessor);
2134
2135 TransEvtInfo eventInfo;
2136 bool ok = action->WaitForTransactionUntil(
2137 _theTask,
2138 &eventInfo, eventProcessor,
2139 until);
2140
2141 if (!ok) return false;
2142
2143/* This should not be needed, event processor will throw if needed */
2144/* if (eventInfo.entryStatus != STATUS__OK)
2145 {
2146 DramaTHROW_S(
2147 eventInfo.entryStatus,
2148 "Set of parameter % in task % completed with error",
2149 name, _taskName);
2150 }
2151 else */ if (eventInfo.complete == 0)
2152 {
2154 eventInfo.entryStatus,
2155 "Set of message % in task %, returned but not complete",
2156 name, _taskName);
2157 }
2158 return true;
2159
2160 }
2161
2162
2175 void GetParam(thread::TMessHandler *action,
2176 const std::string &name,
2177 sds::IdPtr * const argOut,
2178 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2179 Send(action, DITS_MSG_GETPARAM, name, 0 /* Flags */,
2180 sds::Id::CreateNullItem() /* arg */,
2182
2183 TransEvtInfo eventInfo;
2184 action->WaitForTransaction(_theTask,
2185 &eventInfo, eventProcessor,
2186 argOut);
2187/* This should not be needed, event processor will throw if needed */
2188/* if (eventInfo.entryStatus != STATUS__OK)
2189 {
2190 DramaTHROW_S(
2191 eventInfo.entryStatus,
2192 "Get of parameter % from task % completed with error",
2193 name, _taskName);
2194 }
2195 else */ if (eventInfo.complete == 0)
2196 {
2198 eventInfo.entryStatus,
2199 "Get of parameter % from task %, returned but not complete",
2200 name, _taskName);
2201 }
2203 }
2219 const std::initializer_list<std::string> names,
2220 sds::IdPtr * const argOut,
2221 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2223
2224 Send(action, DITS_MSG_MGETPARAM, "", 0 /* Flags */, argIn,
2226
2227 TransEvtInfo eventInfo;
2228 action->WaitForTransaction(_theTask,
2229 &eventInfo, eventProcessor, argOut);
2230/* This should not be needed, event processor will throw if needed */
2231/* if (eventInfo.entryStatus != STATUS__OK)
2232 {
2233 DramaTHROW_S(
2234 eventInfo.entryStatus,
2235 "Get of parameters from task % completed with error",
2236 _taskName);
2237 }
2238 else */ if (eventInfo.complete == 0)
2239 {
2241 eventInfo.entryStatus,
2242 "Get of parameters from task %, returned but not complete",
2243 _taskName);
2244 }
2246 }
2247
2248
2249
2250
2276 bool GetParamWaitUntil(std::chrono::steady_clock::time_point until,
2278 const std::string &name,
2279 sds::IdPtr * const argOut,
2280 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2281 Send(action, DITS_MSG_GETPARAM, name, 0 /* Flags */,
2282 sds::Id::CreateNullItem() /* arg */,
2284
2285 TransEvtInfo eventInfo;
2286 bool ok = action->WaitForTransactionUntil(
2287 _theTask,
2288 &eventInfo, eventProcessor,
2289 until, argOut);
2290 if (!ok) return false;
2291
2292/* This should not be needed, event processor will throw if needed */
2293/* if (eventInfo.entryStatus != STATUS__OK)
2294 {
2295 DramaTHROW_S(
2296 eventInfo.entryStatus,
2297 "Get of parameter % from task % completed with error",
2298 name, _taskName);
2299 }
2300 else */ if (eventInfo.complete == 0)
2301 {
2303 eventInfo.entryStatus,
2304 "Get of parameter % from task %, returned but not complete",
2305 name, _taskName);
2306 }
2307 return true;
2308
2309 }
2336 bool GetParamWaitUntil(std::chrono::steady_clock::time_point until,
2337 thread::TMessHandler *action,
2338 const std::initializer_list<std::string> names,
2339 sds::IdPtr * const argOut,
2340 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2342
2343 Send(action, DITS_MSG_MGETPARAM, "", 0 /* Flags */, argIn,
2345
2346 TransEvtInfo eventInfo;
2347 bool ok = action->WaitForTransactionUntil(
2348 _theTask,
2349 &eventInfo, eventProcessor,
2350 until, argOut);
2351 if (!ok) return false;
2352/* This should not be needed, event processor will throw if needed */
2353/* if (eventInfo.entryStatus != STATUS__OK)
2354 {
2355 DramaTHROW_S(
2356 eventInfo.entryStatus,
2357 "Get of parameters from task % completed with error",
2358 _taskName);
2359 }
2360 else */ if (eventInfo.complete == 0)
2361 {
2363 eventInfo.entryStatus,
2364 "Get of parameters from task %, returned but not complete",
2365 _taskName);
2366 }
2367 return true;
2368
2369 }
2370
2371
2390 void Control(thread::TMessHandler *action,
2391 const std::string &name,
2392 const sds::Id &argIn = sds::Id::CreateNullItem(),
2393 sds::IdPtr * const argOut = nullptr,
2394 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2395
2396 Send(action, DITS_MSG_CONTROL, name, 0 /* Flags */, argIn,
2398
2399 TransEvtInfo eventInfo;
2400 action->WaitForTransaction(_theTask,
2401 &eventInfo, eventProcessor,
2402 argOut);
2403/* This should not be needed, event processor will throw if needed */
2404/* if (eventInfo.entryStatus != STATUS__OK)
2405 {
2406 DramaTHROW_S(
2407 eventInfo.entryStatus,
2408 "Control message % to task % completed with error",
2409 name, _taskName);
2410 }
2411 else */ if (eventInfo.complete == 0)
2412 {
2414 eventInfo.entryStatus,
2415 "Control message % to task %, returned but not complete",
2416 name, _taskName);
2418
2419 }
2420
2451 bool ControlWaitUntil(std::chrono::steady_clock::time_point until,
2452 thread::TMessHandler *action,
2453 const std::string &name,
2454 const sds::Id &argIn = sds::Id::CreateNullItem(),
2455 sds::IdPtr * const argOut = nullptr,
2456 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2457
2458 Send(action, DITS_MSG_CONTROL, name, 0 /* Flags */, argIn,
2460
2461 TransEvtInfo eventInfo;
2462 bool ok = action->WaitForTransactionUntil(
2463 _theTask,
2464 &eventInfo, eventProcessor,
2465 until, argOut);
2466
2467 if (!ok) return false;
2468
2469/* This should not be needed, event processor will throw if needed */
2470/* if (eventInfo.entryStatus != STATUS__OK)
2471 {
2472 DramaTHROW_S(
2473 eventInfo.entryStatus,
2474 "Control message % to task % completed with error",
2475 name, _taskName);
2476 }
2477 else */ if (eventInfo.complete == 0)
2480 eventInfo.entryStatus,
2481 "Control message % to task %, returned but not complete",
2482 name, _taskName);
2483 }
2484 return true;
2485
2486 }
2487
2488
2499 virtual void GetPathImmed();
2500
2501
2502
2505 virtual void Died() {
2506 _path = 0;
2507 _state = PathState::DIED;
2508 }
2519 virtual void LosePath();
2528 virtual void Delete(int force);
2529
2530
2531
2550 const sds::Id &argIn,
2552
2553 //fprintf(stderr,"MonitorStart - about to send monitor\n");
2556
2557 TransEvtInfo eventInfo;
2558 //fprintf(stderr,"MonitorStart - about to wait\n");
2559 action->WaitForTransaction(_theTask,
2560 &eventInfo, eventProcessor, nullptr);
2561 //fprintf(stderr,"MonitorStart - wait complete.\n");
2562/* This should not be needed, event processor will throw if needed */
2563/* if (eventInfo.entryStatus != STATUS__OK)
2564 {
2565 DramaTHROW_S(
2566 eventInfo.entryStatus,
2567 "Monitor Start message to task % completed with error",
2568 _taskName);
2569 }
2570 else */ if (eventInfo.complete == 0)
2571 {
2573 eventInfo.entryStatus,
2574 "Monitor START to task %, returned but not complete",
2575 _taskName);
2577
2578 }
2598 void MonitorForward(thread::TMessHandler *action,
2599 const sds::Id &argIn,
2600 MessageEventHandler * const eventProcessor = &_simpleMonEventProcessor) {
2601
2602 Send(action, DITS_MSG_MONITOR, "FORWARD", DITS_M_SENDCUR, argIn,
2604
2605 TransEvtInfo eventInfo;
2606 action->WaitForTransaction(_theTask,
2607 &eventInfo, eventProcessor, nullptr);
2608/* This should not be needed, event processor will throw if needed */
2609/* if (eventInfo.entryStatus != STATUS__OK)
2610 {
2611 DramaTHROW_S(
2612 eventInfo.entryStatus,
2613 "Monitor FORWARD message to task % completed with error",
2614 _taskName);
2615 }
2616 else */ if (eventInfo.complete == 0)
2617 {
2619 eventInfo.entryStatus,
2620 "Monitor FORWARD to task %, returned but not complete",
2621 _taskName);
2622 }
2623
2624 }
2626
2627
2644 const std::string &task,
2645 const std::string action,
2646 std::initializer_list<std::string> pars,
2647 MessageEventHandler * const eventProcessor = &_simpleMonEventProcessor) {
2648
2650 args.Put("TASK", task);
2651 args.Put("ACTION", action);
2652 args.AddToArgCmdStruct(pars, 3);
2653
2655
2656 }
2680 template <typename ContainerType>
2682 const std::string &task,
2683 const std::string action,
2684 const ContainerType &pars,
2685 MessageEventHandler * const eventProcessor = &_simpleMonEventProcessor) {
2686
2687 static_assert(
2688 std::is_convertible<typename ContainerType::value_type,std::string>::value,
2689 "The container value type must be convertible to a std::string");
2690
2692 args.Put("TASK", task);
2693 args.Put("ACTION", action);
2694 args.AddToArgCmdStruct(pars, 3);
2695
2697
2698 }
2699
2700
2711 void MonitorCancel(thread::TMessHandler *action,
2712 int monId,
2713 MessageEventHandler * const eventProcessor = &_simpleEventProcessor) {
2714
2715 sds::Id argIn = sds::Id::CreateArgCmdStruct(std::vector<INT32>{monId});
2716 Send(action, DITS_MSG_MONITOR, "CANCEL", DITS_M_SENDCUR, argIn,
2718
2719 TransEvtInfo eventInfo;
2720 action->WaitForTransaction(_theTask,
2721 &eventInfo, eventProcessor, nullptr);
2722/* This should not be needed, event processor will throw if needed */
2723/* if (eventInfo.entryStatus != STATUS__OK)
2724 {
2725 DramaTHROW_S(
2726 eventInfo.entryStatus,
2727 "Monitor CANCEL message to task % completed with error",
2728 _taskName);
2729 }
2730 else */ if (eventInfo.complete == 0)
2731 {
2733 eventInfo.entryStatus,
2734 "Monitor CANCEL to task %, returned but not complete",
2735 _taskName);
2736 }
2737
2739
2740
2753 virtual DitsPathType GetDitsPath(bool nothrow=false);
2754
2764 static DVOID HandleDisconnected(const char *task,
2765 DitsPathType path,
2767
2776
2785
2786
2787 }; /* class Path */
2788
2789
2790} // namespace drama
2792#endif
2793
long int MaxMessages() const
Return the Max Messages specification.
Definition path.hh:165
Buffers(const DitsPathInfoType &info)
Construct a drama::Buffers object from a DitsPathInfoType variable.
Definition path.hh:150
long int MaxReplies() const
Return the Max Replies specification.
Definition path.hh:175
long int MessageBytes() const
Return the message bytes specification.
Definition path.hh:160
long int ReplyBytes() const
Return the reply bytes specification.
Definition path.hh:170
Buffers(long int MessageBytes=800, long int MaxMessages=2, long int ReplyBytes=800, long int MaxReplies=10)
Construct a drama::Buffers object from individual specifications.
Definition path.hh:135
A Class which supports setting up DRAMA message buffer structures.
Definition path.hh:120
Defines and optionally creates a shared memory section containing an SDS structure.
Definition bulkdata.hh:434
Defines and optionally creates a shared memory section.
Definition bulkdata.hh:121
virtual bool Process(thread::ProcessInfo messInfo, const TransEvtInfo &eventInfo, const sds::IdPtr &arg) override
This method is invoked for each message received in response to a GetPath operation.
virtual void NewTransaction(DitsMsgType msgType, DitsTransIdType tid) override
This method is invoked each time a messages is initiated.
Default event handler type for GetPath() calls.
Definition path.hh:476
void SetStatus(EventStatus newVal)
If a sub-class overrides one of the methods of MessageEventHandler that this class overrides,...
Definition path.hh:588
void NewTransaction(DitsMsgType msgType, DitsTransIdType tid) override
Invoked when a new transaction is started.
virtual void TaskDied(drama::thread::ProcessInfo messInfo, StatusType status) override
Invoked if the subsidiary task died whilst we were waiting for a reply from it.
std::string GetStr() const
Return a string version of the current event status values.
Definition path.hh:557
bool operator()() const
Indicate if the message is outstanding (running)
Definition path.hh:564
EventStatus
Detailed status of the message is returned using this enum.
Definition path.hh:534
@ Completed
The message has completed successfully.
@ Running
The message has been sent and has not completed.
@ Failed
The message has completed with a failure.
@ NotStarted
The message has not been sent as yet.
IsRunningType()
Constructor.
Definition path.hh:577
EventStatus Get() const
Return the status of the message.
Definition path.hh:544
void MessageRejected(drama::thread::ProcessInfo messInfo, StatusType status) override
Invoked for when a message is rejected.
std::string GetStr(EventStatus val) const
Return a string version of a event status value.
virtual void MessageComplete(drama::thread::ProcessInfo messInfo, StatusType status, const drama::sds::IdPtr &arg) override
Invoked if a message complete message is received.
A message event handling object that maintains a variable indicating if the message is outstanding.
Definition path.hh:529
virtual void BulkTransferred(thread::ProcessInfo messInfo, DitsBulkInfoType bulkInfo)
Invoked if a bulk data transferred message is received.
virtual void UserMessage(thread::ProcessInfo messInfo, const std::string &task, const std::string &message)
Invoked if a message for the user is received.
virtual void SignalReceived(thread::ProcessInfo messInfo, const sds::IdPtr &arg)
Invoked if a signal for the action is received.
virtual void NewTransaction(DitsMsgType msgType, DitsTransIdType tid) override
This method is invoked each time a message is initiated.
virtual void MessageRejected(thread::ProcessInfo messInfo, StatusType status)
Invoked if the message sent was rejected by the target task.
virtual void BulkDone(thread::ProcessInfo messInfo)
Invoked if a bulk data done message is received.
virtual void ThreadWaitAbort(thread::ProcessInfo messInfo, StatusType status)
Invoked if the thread wait is aborted.
virtual bool KickReceived(thread::ProcessInfo messInfo, const sds::IdPtr &arg)
Invoked if an action waiting on a message receives a kick message during the wait.
virtual bool Process(thread::ProcessInfo messInfo, const TransEvtInfo &eventInfo, const sds::IdPtr &arg) override final
Method invoked to process messages.
virtual void TaskDied(thread::ProcessInfo messInfo, StatusType status)
Invoked if the subsidiary task died whilst we were waiting for a reply from it.
virtual void MessageComplete(thread::ProcessInfo messInfo, StatusType status, const sds::IdPtr &arg)
Invoked if a message complete message is received.
virtual void TriggerReceived(thread::ProcessInfo messInfo, StatusType status, const sds::IdPtr &arg)
Invoked if a trigger message is received.
virtual void ErrorReport(thread::ProcessInfo messInfo, const std::string &task, const ErsMessageVector &messages)
Invoked if an error report for the user is received.
Message event handling objects.
Definition path.hh:194
virtual void TriggerReceived(thread::ProcessInfo messInfo, StatusType status, const sds::IdPtr &arg) override
Invoked if a trigger message is received.
Forward monitor message event handling objects.
Definition path.hh:452
void Kick(thread::TMessHandler *action, const std::string &name, const sds::Id &argIn=sds::Id::CreateNullItem(), sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a kick message to a task and blocks the current thread until the reply is received.
Definition path.hh:1757
virtual void SetPriority(const int Priority, const bool Absolute=false)
Set the load time priority for a task when loaded.
virtual void Disconnected()
Invoked when the task we are talking to disappears.
virtual ~Path()
Destructor.
void KickBulk(thread::TMessHandler *action, const std::string &name, BulkData *argIn, bool isSds, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a kick message, with bulk data argument, to a task and blocks the current thread until the repl...
Definition path.hh:1885
void MonitorForward(thread::TMessHandler *taction, const std::string &task, const std::string action, const ContainerType &pars, MessageEventHandler *const eventProcessor=&_simpleMonEventProcessor)
Send a Monitor Forward message to a task and block until it is complete.
Definition path.hh:2708
Path(Path &&source)=default
Move copy constructor.
void Report(thread::TMessHandler *action) const
Dump details using <a href="../routines/MsgOut.html">MsgOut()</a>.
void MonitorStart(thread::TMessHandler *action, const sds::Id &argIn, MessageEventHandler *const eventProcessor)
Send a Monitor Start message to a task and block until is it is complete.
Definition path.hh:2576
virtual void SetFlowControl()
Set the FLOW control flag.
Definition path.hh:1113
virtual void LosePath()
Lose the path and then set as if the task has died.
void ObeyBulk(thread::TMessHandler *action, const std::string &name, BulkData *argIn, bool isSds, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends an obey message to a task, with bulk data argument, and blocks the current thread until the act...
Definition path.hh:1520
bool GetPathWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, thread::TransEvtProcessor *const eventProcessor=&_getPathEventProcessor)
Get a path to a the task.
bool ObeyBulkWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, BulkData *argIn, bool isSds, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends an obey message to a task, with a bulk data argument, and blocks the current thread until the a...
Definition path.hh:1646
void GetParam(thread::TMessHandler *action, const std::initializer_list< std::string > names, sds::IdPtr *const argOut, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a get parameter message to a task and blocks the current thread until it completes.
Definition path.hh:2245
void MonitorForward(thread::TMessHandler *taction, const std::string &task, const std::string action, std::initializer_list< std::string > pars, MessageEventHandler *const eventProcessor=&_simpleMonEventProcessor)
Send a Monitor Forward message to a task and block until it is complete.
Definition path.hh:2670
const std::string & GetArgument() const
Fetch the load argument.
Definition path.hh:1041
const std::string & GetHost() const
Fetch the task host name.
Definition path.hh:1036
virtual void SetBuffers(const Buffers &buffs)
Set the DRAMA path buffers.
Definition path.hh:1105
void SetParam(thread::TMessHandler *action, const std::string &name, const sds::Id &argIn, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a set parameter message to a task and blocks the current thread until it completes.
Definition path.hh:2102
bool KickWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, const sds::Id &argIn=sds::Id::CreateNullItem(), sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a kick message to a task and block the current thread until the reply is received or a timeout ...
Definition path.hh:1817
bool GetParamWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, sds::IdPtr *const argOut, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a get parameter message to a task and blocks the current thread until it completes.
Definition path.hh:2303
bool SetParamWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, const sds::Id &argIn, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a set parameter message to a task and blocks the current thread until it completes or a timeout...
Definition path.hh:2154
void GetParam(thread::TMessHandler *action, const std::string &name, sds::IdPtr *const argOut, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a get parameter message to a task and blocks the current thread until it completes.
Definition path.hh:2202
virtual bool Active() const
Indicate if a task is active.
Definition path.hh:1254
static void HandleDisconnected(const char *task, DitsPathType path, StatusType *status)
Handle task disconnection.
Path & operator=(Path &&rhs)=default
Move assignment operator.
void ObeyBulk(thread::TMessHandler *action, const std::string &name, BulkDataSds *argIn, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends an obey message to a task, with SDS bulk data argument, and blocks the current thread until the...
Definition path.hh:1587
virtual void Died()
Note that a task has died.
Definition path.hh:2532
bool GetPathLoaded() const
Indicate if a the task was loaded.
Definition path.hh:1017
virtual void ClearState()
Clear the task state.
Definition path.hh:1245
virtual void SetSymbols(const bool Flag=true)
Interpert file name as a symbol (VMS Target only)
Definition path.hh:1199
virtual bool Busy() const
Returns true if we can't send a message because path is busy.
Definition path.hh:1280
virtual bool Initial() const
Return true if a GetPath operation is required.
Definition path.hh:1269
virtual DitsPathType GetDitsPath(bool nothrow=false)
Return the underlying DITS path object.
bool KickBulkWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, BulkDataSds *argIn, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a kick message to a task and block the current thread until the reply is received or a timeout ...
Definition path.hh:2078
virtual void Delete(int force)
Delete the task.
bool KickBulkWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, BulkData *argIn, bool isSds, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a kick message to a task and block the current thread until the reply is received or a timeout ...
Definition path.hh:2000
bool GetParamWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::initializer_list< std::string > names, sds::IdPtr *const argOut, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a get parameter message to a task and blocks the current thread until it completes.
Definition path.hh:2363
bool ControlWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, const sds::Id &argIn=sds::Id::CreateNullItem(), sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a control message to a task and block the current thread until the reply is received or a timeo...
Definition path.hh:2478
static void SpawnKickArgUpdate(drama::sds::Id *arg, DitsTransIdType tid)
update an SDS argument structure for kicking a Spawned transaction
static drama::sds::Id SpawnKickArg(DitsTransIdType tid)
Create an SDS argument structure for kicking a Spawned transaction.
void KickBulk(thread::TMessHandler *action, const std::string &name, BulkDataSds *argIn, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a kick message, with SDS bulk data argument, to a task and blocks the current thread until the ...
Definition path.hh:1944
virtual void SetFile(const std::string &File)
Set the executable file name.
Definition path.hh:1094
virtual std::shared_ptr< Task > GetTask() const
Get a reference to the DRAMA task we are part of.
Definition path.hh:1290
virtual void SetProcess(const std::string &ProcessName)
Set the process names.
virtual void GetPathImmed()
Get a path to a task if DITS already has it.
void Obey(thread::TMessHandler *action, const std::string &name, const sds::Id &argIn=sds::Id::CreateNullItem(), sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends an obey message to a task and blocks the current thread until the action is complete.
Definition path.hh:1372
bool ObeyBulkWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, BulkDataSds *argIn, unsigned notifyBytes=1024 *1024, sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends an obey message to a task, with a SDS bulk data argument, and blocks the current thread until t...
Definition path.hh:1726
Path(std::weak_ptr< Task > theTask, DitsPathType path)
Construct from existing DITS path.
virtual void SetArgument(const std::string &LoadArg, bool const Append=false)
Set the load time argument string.
virtual void SetName(const std::string &TaskName)
Set the task name.
virtual void SetProg(const bool Flag=true)
Interpert file name as a program name.
Definition path.hh:1222
virtual void SetNames(const bool Flag=true)
Insist that symbols known to IMP_Master be known to the task.
Definition path.hh:1181
static std::chrono::steady_clock::time_point DeltaToTimePoint(int secs)
Given a number of seconds, return a time point that number of seconds into the future.
Definition path.hh:1419
virtual bool IsDead() const
Return true if a previously active task has disconnected.
Definition path.hh:1262
void Control(thread::TMessHandler *action, const std::string &name, const sds::Id &argIn=sds::Id::CreateNullItem(), sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends a control message to a task and blocks the current thread until the reply is received.
Definition path.hh:2417
virtual void LogLoad(const bool Flag=true)
Load use <a href="../routines/MsgOut.html">MsgOut()</a> a lines indicating the load operations.
Definition path.hh:1236
Path(std::weak_ptr< Task > theTask)
Path to self constructor.
void MonitorCancel(thread::TMessHandler *action, int monId, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Send a Monitor Cancel message to a task and block until it is complete.
Definition path.hh:2738
void MonitorForward(thread::TMessHandler *action, const sds::Id &argIn, MessageEventHandler *const eventProcessor=&_simpleMonEventProcessor)
Send a Monitor Forward message to a task and block until it is complete.
Definition path.hh:2625
const std::string GetTaskName() const
Fetch the task name.
Path()
Construct a null object.
Definition path.hh:996
virtual void SetStackSize(unsigned bytes)
Set the stack size for the load.
void GetPath(thread::TMessHandler *action, thread::TransEvtProcessor *const eventProcessor=&_getPathEventProcessor)
Get a path to a the task.
bool ObeyWaitUntil(std::chrono::steady_clock::time_point until, thread::TMessHandler *action, const std::string &name, const sds::Id &argIn=sds::Id::CreateNullItem(), sds::IdPtr *const argOut=nullptr, MessageEventHandler *const eventProcessor=&_simpleEventProcessor)
Sends an obey message to a task and blocks the current thread until the action is complete or a timeo...
Definition path.hh:1453
virtual void SetHost(const std::string &Host)
Set the task host name.
Definition path.hh:1067
A Class which provides access to DRAMA's message sending facilities.
Definition path.hh:689
std::lock_guard< mutexType > guardType
Defines the type of a lock guard using our mutex type.
Definition task.hh:460
static Id CreateArgCmdStruct(const ContainerType &values, const std::string &name="ArgStructure")
Factory constructor which creates a new "Arg" style SDS structure in the DRAMA Command style.
Definition sds.hh:2754
static Id CreateArgStruct(const std::string &name="ArgStructure")
Factory constructor which creates a new "Arg" style SDS structure.
static Id CreateNullItem()
Factory constructor method that constructs an null sds::Id item .
A C++ Interface to the handling SDS structures.
Definition sds.hh:428
A class with information used by the TransEvtProcessor methods.
Definition thread.hh:147
This interface class must be implemented by classes which have threads waiting for messages.
Definition thread.hh:308
DRAMA 2 main include file.
#define DramaTHROW_S(status_, format_,...)
Throw a Drama exception with safe string formatting.
Definition exception.hh:110
std::shared_ptr< Id > IdPtr
A shared pointer for sds::Id items.
Definition sds.hh:3318
void CreateRunDramaTask()
Create and run a DRAMA task, with standard exception handling.
Definition task.hh:1322
std::chrono::steady_clock::time_point CreateFutureTimepoint(double secs)
Return a steady_clock time point suitable for use with the Path::...WaitUntil() methods based on an o...
Definition path.hh:94
@ Kick
Action has been kicked.
@ BulkTransferred
bulk data transferred message received
@ BulkDone
message received
@ Obey
Action has been obeyed.
std::vector< ErsMessageType > ErsMessageVector
A type used to return ERS messages.
Definition path.hh:180
The drama namespace contains all the classes, types etc of the DRAMA 2 implementation.
Definition drama.hh:93
StatusType entryStatus
Entry status value.
Definition task.hh:130
bool complete
IS transaction complete.
Definition task.hh:133
Structure is used to store details about a DRAMA reschedule message relating to a transaction,...
Definition task.hh:128
DRAMA 2 include file - Code common to DRAMA 2 features supporting threading.