00001
00002 #ifndef DDF_SENSORENODEIPC_HPP
00003 #define DDF_SENSORENODEIPC_HPP
00004
00005 #include "sensornodegeneric.hpp"
00006 #include "mbx.hpp"
00007
00008 namespace jafar
00009 {
00010 namespace ddf
00011 {
00012 struct SNConnectCore
00013 {
00014 unsigned int from_node;
00015 unsigned int code;
00016 key_t chan_fltr_key;
00017
00018 };
00019
00020 struct SNConnectStruct
00021 {
00022 long mtype;
00023 SNConnectCore mtext;
00024 };
00025
00026 void PrintSNConnect(SNConnectStruct const& sn)
00027 {
00028 JFR_DEBUG("Type: " << SN_CONNECT_STRING[sn.mtype] << "\n");
00029 JFR_DEBUG("Node id: " << sn.mtext.from_node << "\n");
00030 JFR_DEBUG("Chan fltr key: " << sn.mtext.chan_fltr_key << "\n");
00031 JFR_DEBUG("Code: " << SN_CODE_STRING[sn.mtext.code] << "\n");
00032 }
00033
00038 template<typename PARENT>
00039 class ManagerThreadFunc
00040 {
00041 PARENT *m_par;
00042
00043 public:
00044 ManagerThreadFunc(PARENT *parent): m_par(parent) {}
00045 ~ManagerThreadFunc() {}
00046 void operator()()
00047 {
00048 m_par->SetupChanMgrComm();
00049
00050 while(m_par->ChanMgrAlive())
00051 {
00052 m_par->Listen();
00053 }
00054
00055 m_par->CleanChanMgrComm();
00056 }
00057 };
00058
00064 template<typename PARAMS>
00065 class SensorNodeIPC : public SensorNodeGeneric<PARAMS>
00066 {
00067 static const int MBOX_OFFSET = 1987;
00068
00069 ManagerThreadFunc<SensorNodeIPC<PARAMS> > m_ChanMgr_Fun;
00070 int m_msqid;
00071 bool m_chanMgr_alive;
00072 boost::thread *m_pChanMgr;
00073 mutable boost::mutex m_msqmutex;
00074
00075 public:
00076 SensorNodeIPC(unsigned short sv_size, time const& sync_period, PredictModelFactoryBase *pred_factory, \
00077 CommFactoryBase *comm_factory, bool use_delayed)
00078 : SensorNodeGeneric<PARAMS>(sv_size, this->m_static_id++, sync_period, pred_factory, comm_factory,\
00079 use_delayed),m_ChanMgr_Fun(this),m_chanMgr_alive(true)
00080 {
00081 m_pChanMgr = new boost::thread(m_ChanMgr_Fun);
00082 }
00083
00084 virtual ~SensorNodeIPC() {
00085 if (m_pChanMgr != NULL){
00086 m_chanMgr_alive = false;
00087 m_pChanMgr->join();
00088 delete m_pChanMgr;
00089 }
00090 }
00091
00092 virtual void InitNode() {}
00093 virtual void Start(time const& first_deadling) {}
00094 virtual void KillNode() {}
00095
00096 key_t GetChanMgrKey() { return this->GetID() + MBOX_OFFSET; }
00097 bool ChanMgrAlive() { return m_chanMgr_alive; }
00098
00099
00100 void DbgWaitInfoFromNode(unsigned short node_id) {
00101 time delay = time::from_us(2);
00102
00103 if(!this->IsNodeConnected(node_id)) return;
00104
00105
00106 ChannelFilterBase * cptr = this->GetChanFltbyNodeID(node_id);
00107 JFR_POSTCOND(cptr, "NULL");
00108 while(!cptr->HasJustReceivedInfo()) delay.sleep_period();
00109 cptr->ResetJustReceived();
00110 }
00111
00112
00113 void SetupChanMgrComm() { m_msqid = mbx_create(GetChanMgrKey()); }
00114
00115
00116 void Listen()
00117 {
00118 SNConnectStruct sn_connect;
00119 int res, from_node;
00120 int buf_size = sizeof(SNConnectCore);
00121
00122
00123 {
00124 boost::mutex::scoped_lock lock(m_msqmutex);
00125 res = mbx_rec(m_msqid, (msgbuf*) &sn_connect, buf_size, SN_CONNECT);
00126 }
00127
00128 if (res == MSG_REC)
00129 {
00130 JFR_VDEBUG( "Node " << this->m_id << " : received connection request from node " << sn_connect.mtext.from_node << "\n");
00131
00132 from_node = sn_connect.mtext.from_node;
00133 sn_connect.mtype = SN_CONFIRM_CONNECTION;
00134 sn_connect.mtext.from_node = this->GetID();
00135
00136
00137 if (this->IsNodeConnected(from_node))
00138 {
00139 sn_connect.mtext.code = SN_CODE_ALREADY_CONNECTED;
00140 }
00141 else
00142 {
00143
00144 CommBase & Comm = this->CreateChannelFilter(from_node);
00145 sn_connect.mtext.chan_fltr_key = Comm.GetCommId();
00146 sn_connect.mtext.code = SN_CODE_OK;
00147 }
00148
00149
00150 {
00151 boost::mutex::scoped_lock lock(m_msqmutex);
00152 mbx_send(m_msqid, (msgbuf *) &sn_connect, buf_size);
00153 JFR_VDEBUG( "Node " << this->m_id << " : send confirmation to node " << from_node
00154 << " with key " << sn_connect.mtext.chan_fltr_key << "\n");
00155 }
00156 }
00157
00158
00159 {
00160 boost::mutex::scoped_lock lock(m_msqmutex);
00161 res = mbx_rec(m_msqid, (msgbuf*) &sn_connect, buf_size, SN_DISCONNECT);
00162 }
00163
00164 if (res == MSG_REC) {
00165 JFR_VDEBUG( "Received de-connection request -> not yet implemented\n");
00166 }
00167 }
00168
00169 void CleanChanMgrComm() { boost::mutex::scoped_lock lock(m_msqmutex); mbx_destroy(m_msqid); }
00170
00171 SN_CODE_TYPE RequestConnection(unsigned short node_id)
00172 {
00173 int msqid, res, buffer_length;
00174 SNConnectStruct sbuf;
00175
00176 buffer_length = sizeof(SNConnectCore);
00177
00178 {
00179 boost::mutex::scoped_lock lock(m_msqmutex);
00180 msqid = mbx_getq((key_t) (node_id + MBOX_OFFSET));
00181 }
00182
00183 sbuf.mtype = (long) SN_CONNECT;
00184 sbuf.mtext.from_node = this->m_id;
00185 sbuf.mtext.chan_fltr_key = (key_t) 0;
00186 sbuf.mtext.code = SN_CODE_VOID;
00187
00188 JFR_VDEBUG( "Node " << this->m_id << " : send connection request to node " << node_id << "\n");
00189
00190 {
00191 boost::mutex::scoped_lock lock(m_msqmutex);
00192 mbx_blksend(msqid, (msgbuf *) &sbuf, buffer_length);
00193 res = mbx_blkrec(msqid, (msgbuf *) &sbuf, buffer_length, SN_CONFIRM_CONNECTION);
00194 }
00195
00196 if (res == MSG_REC)
00197 {
00198 JFR_VDEBUG( "Node " << this->m_id << " : receive confirmation from node " << sbuf.mtext.from_node << "\n");
00199
00200 if (sbuf.mtext.code == SN_CODE_ALREADY_CONNECTED) return SN_CODE_ALREADY_CONNECTED;
00201 else
00202 {
00203 this->CreateChannelFilter(sbuf.mtext.chan_fltr_key, sbuf.mtext.from_node);
00204 }
00205 }
00206 else {
00207 perror("could not receive confirmation");
00208 exit(EXIT_FAILURE);
00209 }
00210
00211 return SN_CODE_OK;
00212 }
00213 };
00214
00215 template<typename PARAMS>
00216 class SensorNodePeriodic : public SensorNodeIPC<PARAMS>
00217 {
00218 struct PerProcStruct
00219 {
00220 bool initialized;
00221 boost::thread *pthread;
00222 Timer_Command *cmd;
00223 Thread_Periodic_Func<SensorNodeBase> *pthread_func;
00224 time period;
00225 };
00226
00227 PerProcStruct m_sync_loop;
00228
00229 protected:
00230 using SensorNodeIPC<PARAMS>::m_next_sync_horizon;
00231 using SensorNodeIPC<PARAMS>::m_last_sync;
00232
00233 public:
00234 SensorNodePeriodic(unsigned short sv_size, time const& sync_period, PredictModelFactoryBase *pred_factory, \
00235 CommFactoryBase *comm_factory, bool use_delayed)
00236 : SensorNodeIPC<PARAMS>(sv_size, sync_period, pred_factory, comm_factory, use_delayed)
00237 {
00238 m_sync_loop.period = sync_period;
00239 m_sync_loop.initialized = false;
00240 }
00241
00242 virtual ~SensorNodePeriodic() { KillThread(); }
00243
00244 void Start(time const& first_deadline){
00245 time tmp(1e-3);
00246 m_next_sync_horizon = first_deadline;
00247 m_last_sync = first_deadline;
00248 m_sync_loop.cmd->Run(true);
00249 while (!m_sync_loop.cmd->IsInitialized()) tmp.sleep_period();
00250 }
00251
00252 void InitNode() {
00253 JFR_PRECOND(!m_sync_loop.initialized, "not initialized");
00254 CreatePeriodicThread();
00255 }
00256
00257 void KillNode(){
00258 KillThread();
00259 }
00260
00261 void Suspend() { m_sync_loop.cmd->Run(false); }
00262
00263 private:
00264
00265 void CreatePeriodicThread()
00266 {
00267 JFR_PRECOND(!m_sync_loop.initialized, "not initialized");
00268 m_sync_loop.cmd = new Timer_Command();
00269 m_sync_loop.pthread_func = new Thread_Periodic_Func<SensorNodeBase>(m_sync_loop.period, *m_sync_loop.cmd, this);
00270 m_sync_loop.pthread = new boost::thread(*m_sync_loop.pthread_func);
00271 m_sync_loop.initialized = true;
00272 }
00273
00274 void KillThread() {
00275 if(m_sync_loop.initialized)
00276 {
00277 m_sync_loop.cmd->Kill(true);
00278 m_sync_loop.pthread->join();
00279 Destroy();
00280 }
00281 }
00282
00283 void Destroy() {
00284 if (m_sync_loop.initialized) {
00285 delete m_sync_loop.cmd;
00286 delete m_sync_loop.pthread;
00287 delete m_sync_loop.pthread_func;
00288 m_sync_loop.initialized = false;
00289 }
00290 }
00291 };
00292
00293 }
00294 }
00295 #endif