Jafar
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
sensornodeipc.hpp
00001 
00002 #ifndef DDF_SENSORENODEIPC_HPP
00003 #define DDF_SENSORENODEIPC_HPP
00004 
00005 #include "sensornodegeneric.hpp"
00006 #include "mbx.hpp"
00007 
00008 namespace jafar
00009 {
00010   namespace ddf
00011   {
00012     struct SNConnectCore
00013     {
00014       unsigned int from_node;       // the id of the node who sent the message
00015       unsigned int code;            // contains ok or the reason of failure (SN_CODE_TYPE)
00016       key_t        chan_fltr_key; // the key for the connection between channel filters
00017       // the node who receive the connection request choose the key (ftok)
00018     };
00019 
00020     struct SNConnectStruct
00021     {
00022       long            mtype;  // one of the SN_CONNECT_TYPE
00023       SNConnectCore   mtext;
00024     };
00025 
00026     void PrintSNConnect(SNConnectStruct const& sn)
00027     {
00028       JFR_DEBUG("Type: " << SN_CONNECT_STRING[sn.mtype] << "\n");
00029       JFR_DEBUG("Node id: " << sn.mtext.from_node << "\n");
00030       JFR_DEBUG("Chan fltr key: " << sn.mtext.chan_fltr_key << "\n");
00031       JFR_DEBUG("Code: " << SN_CODE_STRING[sn.mtext.code] << "\n");
00032     }
00033 
00038     template<typename PARENT>
00039     class ManagerThreadFunc
00040     {
00041       PARENT *m_par;
00042 
00043     public:
00044       ManagerThreadFunc(PARENT *parent): m_par(parent) {}
00045       ~ManagerThreadFunc() {}
00046       void operator()()
00047       {
00048         m_par->SetupChanMgrComm();
00049       
00050         while(m_par->ChanMgrAlive())
00051         {
00052           m_par->Listen(); // function managing connection/deconnection of sensor nodes
00053         }
00054 
00055         m_par->CleanChanMgrComm();
00056       }
00057     };
00058 
00064     template<typename PARAMS>
00065     class SensorNodeIPC : public SensorNodeGeneric<PARAMS>
00066     {
00067       static const int  MBOX_OFFSET = 1987;
00068 
00069       ManagerThreadFunc<SensorNodeIPC<PARAMS> > m_ChanMgr_Fun; // Function associated to the channel filters manager
00070       int               m_msqid;          // the message queue id for connection/deconnection from other nodes
00071       bool              m_chanMgr_alive;  // the channel manager quits if set to false
00072       boost::thread *m_pChanMgr;      // this is the channel manager thread
00073       mutable boost::mutex m_msqmutex;
00074 
00075     public:
00076       SensorNodeIPC(unsigned short sv_size, time const& sync_period, PredictModelFactoryBase *pred_factory, \
00077                 CommFactoryBase *comm_factory, bool use_delayed)
00078       : SensorNodeGeneric<PARAMS>(sv_size, this->m_static_id++, sync_period, pred_factory, comm_factory,\
00079                             use_delayed),m_ChanMgr_Fun(this),m_chanMgr_alive(true)
00080       {
00081         m_pChanMgr = new boost::thread(m_ChanMgr_Fun);
00082       }
00083 
00084       virtual ~SensorNodeIPC() {
00085         if (m_pChanMgr != NULL){
00086           m_chanMgr_alive = false;
00087           m_pChanMgr->join();
00088           delete m_pChanMgr;
00089         }
00090       }
00091 
00092       virtual void InitNode() {}
00093       virtual void Start(time const& first_deadling) {}
00094       virtual void KillNode() {}
00095 
00096       key_t GetChanMgrKey() { return this->GetID() + MBOX_OFFSET; }
00097       bool ChanMgrAlive() { return m_chanMgr_alive; }
00098 
00099       // Wait until a message has been received from node_id
00100       void DbgWaitInfoFromNode(unsigned short node_id) {
00101         time delay = time::from_us(2);
00102 
00103         if(!this->IsNodeConnected(node_id)) return;
00104       
00105         // Get the right comm pointer
00106         ChannelFilterBase * cptr = this->GetChanFltbyNodeID(node_id);
00107         JFR_POSTCOND(cptr, "NULL");
00108         while(!cptr->HasJustReceivedInfo()) delay.sleep_period();
00109         cptr->ResetJustReceived();
00110       }
00111 
00112       // Create the message queue for receiving connection requests and sending confirmation
00113       void SetupChanMgrComm() { m_msqid = mbx_create(GetChanMgrKey());  }
00114 
00115       // Called by the channel manager
00116       void Listen()
00117       {
00118         SNConnectStruct sn_connect;
00119         int res, from_node;
00120         int buf_size = sizeof(SNConnectCore);
00121 
00122         // Listen to connection requests
00123         {
00124           boost::mutex::scoped_lock lock(m_msqmutex);
00125           res = mbx_rec(m_msqid, (msgbuf*) &sn_connect, buf_size, SN_CONNECT);
00126         }
00127       
00128         if (res == MSG_REC)
00129         {
00130           JFR_VDEBUG( "Node " << this->m_id << " : received connection request from node " << sn_connect.mtext.from_node << "\n");
00131 
00132           from_node = sn_connect.mtext.from_node;
00133           sn_connect.mtype = SN_CONFIRM_CONNECTION;
00134           sn_connect.mtext.from_node = this->GetID();
00135 
00136           // Check if the node is already connected
00137           if (this->IsNodeConnected(from_node))
00138           {
00139             sn_connect.mtext.code = SN_CODE_ALREADY_CONNECTED;
00140           }
00141           else
00142           {
00143           // Create a new channel filter
00144           CommBase & Comm = this->CreateChannelFilter(from_node);
00145           sn_connect.mtext.chan_fltr_key = Comm.GetCommId();
00146           sn_connect.mtext.code = SN_CODE_OK;
00147           }
00148 
00149           // Send confirmation
00150           {
00151             boost::mutex::scoped_lock lock(m_msqmutex);
00152             mbx_send(m_msqid, (msgbuf *) &sn_connect, buf_size);
00153             JFR_VDEBUG( "Node " << this->m_id << " : send confirmation to node " << from_node
00154                   << " with key " << sn_connect.mtext.chan_fltr_key << "\n");
00155           }
00156         }
00157 
00158         // Listen to disconnection requests
00159         {
00160           boost::mutex::scoped_lock lock(m_msqmutex);
00161           res = mbx_rec(m_msqid, (msgbuf*) &sn_connect, buf_size, SN_DISCONNECT);
00162         }
00163       
00164         if (res == MSG_REC) {
00165           JFR_VDEBUG( "Received de-connection request -> not yet implemented\n");
00166         }
00167       }
00168 
00169       void CleanChanMgrComm() { boost::mutex::scoped_lock lock(m_msqmutex); mbx_destroy(m_msqid); }
00170 
00171       SN_CODE_TYPE RequestConnection(unsigned short node_id)
00172       {
00173         int msqid, res, buffer_length;
00174         SNConnectStruct sbuf;
00175 
00176         buffer_length = sizeof(SNConnectCore);
00177 
00178         {
00179           boost::mutex::scoped_lock lock(m_msqmutex);
00180           msqid = mbx_getq((key_t) (node_id + MBOX_OFFSET)); // Get the message queue id of node_id (for sending connection request)
00181         }
00182       
00183         sbuf.mtype = (long) SN_CONNECT;
00184         sbuf.mtext.from_node = this->m_id;    // identify the connection request source
00185         sbuf.mtext.chan_fltr_key = (key_t) 0; // the receiving node will fill this field
00186         sbuf.mtext.code = SN_CODE_VOID;
00187 
00188         JFR_VDEBUG( "Node " << this->m_id << " : send connection request to node " << node_id << "\n");
00189   
00190         {
00191           boost::mutex::scoped_lock lock(m_msqmutex);
00192           mbx_blksend(msqid, (msgbuf *) &sbuf, buffer_length);  // Blocking behaviour for sending
00193           res = mbx_blkrec(msqid, (msgbuf *) &sbuf, buffer_length, SN_CONFIRM_CONNECTION);
00194         }
00195       
00196         if (res == MSG_REC)
00197         {
00198           JFR_VDEBUG( "Node " << this->m_id << " : receive confirmation from node " << sbuf.mtext.from_node << "\n");
00199 
00200           if (sbuf.mtext.code == SN_CODE_ALREADY_CONNECTED) return SN_CODE_ALREADY_CONNECTED;
00201           else
00202           {
00203               this->CreateChannelFilter(sbuf.mtext.chan_fltr_key, sbuf.mtext.from_node);
00204           }
00205         }
00206         else {
00207           perror("could not receive confirmation");
00208           exit(EXIT_FAILURE);
00209         }
00210 
00211         return SN_CODE_OK;
00212       }
00213     };
00214 
00215     template<typename PARAMS>
00216     class SensorNodePeriodic : public SensorNodeIPC<PARAMS>
00217     {
00218       struct PerProcStruct // container structure for a periodic process
00219       {
00220         bool            initialized; // tells if the structure is initialized (valid pointers etc.)
00221         boost::thread *pthread;    // the main thread
00222         Timer_Command *cmd;        // for controlling the execution of the thread
00223         Thread_Periodic_Func<SensorNodeBase> *pthread_func; // the function executed by the thread (associated to the thread)
00224         time            period; // period of time between the calls of pthread_func
00225       };
00226 
00227       PerProcStruct m_sync_loop; // the synchronization process (periodic task)
00228 
00229     protected:
00230       using SensorNodeIPC<PARAMS>::m_next_sync_horizon;
00231       using SensorNodeIPC<PARAMS>::m_last_sync;
00232       
00233     public:
00234       SensorNodePeriodic(unsigned short sv_size, time const& sync_period, PredictModelFactoryBase *pred_factory, \
00235                   CommFactoryBase *comm_factory, bool use_delayed)
00236         : SensorNodeIPC<PARAMS>(sv_size, sync_period, pred_factory, comm_factory, use_delayed)
00237       {
00238         m_sync_loop.period = sync_period;
00239         m_sync_loop.initialized = false;
00240       }
00241 
00242       virtual ~SensorNodePeriodic() { KillThread(); }
00243 
00244       void Start(time const& first_deadline){
00245         time tmp(1e-3);
00246         m_next_sync_horizon = first_deadline;
00247         m_last_sync = first_deadline;
00248         m_sync_loop.cmd->Run(true);
00249         while (!m_sync_loop.cmd->IsInitialized()) tmp.sleep_period(); // wait until the periodic thread is running
00250       }
00251 
00252       void InitNode() {
00253         JFR_PRECOND(!m_sync_loop.initialized, "not initialized");
00254         CreatePeriodicThread();
00255       }
00256 
00257       void KillNode(){
00258         KillThread();
00259       }
00260 
00261       void Suspend() { m_sync_loop.cmd->Run(false); }
00262       
00263     private:
00264 
00265       void CreatePeriodicThread()
00266       {
00267         JFR_PRECOND(!m_sync_loop.initialized, "not initialized");
00268         m_sync_loop.cmd = new Timer_Command();
00269         m_sync_loop.pthread_func = new Thread_Periodic_Func<SensorNodeBase>(m_sync_loop.period, *m_sync_loop.cmd, this);
00270         m_sync_loop.pthread = new boost::thread(*m_sync_loop.pthread_func);
00271         m_sync_loop.initialized = true;
00272       }
00273       
00274       void KillThread() {
00275         if(m_sync_loop.initialized)
00276         {
00277           m_sync_loop.cmd->Kill(true); // Ask the thread to return
00278           m_sync_loop.pthread->join(); // Wait until it joins
00279           Destroy();
00280         }
00281       }
00282 
00283       void Destroy() {
00284         if (m_sync_loop.initialized) {
00285         delete m_sync_loop.cmd;
00286           delete m_sync_loop.pthread;
00287           delete m_sync_loop.pthread_func;
00288           m_sync_loop.initialized = false;
00289         }
00290       }
00291     };
00292 
00293   } // namespace ddf
00294 } // namespace jafar
00295 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on Wed Oct 15 2014 00:37:17 for Jafar by doxygen 1.7.6.1
LAAS-CNRS