Jafar
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
sensornodegeneric.hpp
00001 
00002 #ifndef DDF_SENSORNODEGENERIC_HPP
00003 #define DDF_SENSORNODEGENERIC_HPP
00004 
00005 #include "BayesFilter/bayesFlt.hpp"
00006 
00007 #include "predictenginebase.hpp"
00008 #include "predictmodelfactory.hpp"
00009 #include "commfactory.hpp"
00010 #include "infocontainer.hpp"
00011 #include "periodicthread.hpp"
00012 #include "channelfilterbase.hpp"
00013 #include "definitions.hpp"
00014 #include "ctime.hpp"
00015 #include "debugging.hpp"
00016 
00017 namespace jafar
00018 {
00019   namespace ddf
00020   {
00021 
00022     /* Implement a sensor node with three functions: a local filter (nodal filter), a sensor manager (connection
00023        de-connection of other sensor nodes) and a synchronization mecanisme for fusing info from the other nodes
00024        (through channel filters) A sensor node must be able to do state prediction (the same as in the channel filters)
00025 
00026        author: Pierre Lamon */
00027 
00028     // Definitions of structures for connection/deconnection of sensor nodes
00029     typedef enum {SN_CONNECT=1, SN_DISCONNECT, SN_CONFIRM_CONNECTION, SN_CONFIRM_DECONNECTION} SN_CONNECT_TYPE;
00030     typedef enum {SN_CODE_VOID, SN_CODE_OK, SN_CODE_ALREADY_CONNECTED} SN_CODE_TYPE;
00031     typedef enum {SN_RET_OK,SN_RET_NOT_READY,SN_RET_FAIL} SN_RET_TYPE;
00032 
00033     const char SN_CONNECT_STRING[][30] = {"SN_VOID","Request connect","Request disconnect","Confirm connect", "Confirm disconnect"};
00034     const char SN_CODE_STRING[][30] = {"SN_CODE_VOID","ok","node already connected"};
00035 
00036     // Used to store all what is needed for state prediction and update (used by PurgeFifo)
00037     template<typename PARAMS>
00038     class StoreiI
00039     {
00040     public:
00041       InfoContainer   m_info_cntr;
00042       PARAMS       m_params;
00043 
00044       StoreiI(unsigned short size) : m_info_cntr(size) {}
00045       ~StoreiI(){}
00046     };
00047 
00048 
00057     class SensorNodeBase
00058     {
00059     protected:
00060       typedef std::list<ChannelFilterBase *>   ChanFiltersBuf;
00061       static unsigned short  m_static_id;      // incremented each time a SensorNodeIPC is instancied
00062 
00063       unsigned short            m_id;                // node's id (must be unique)
00064       unsigned short            m_sv_size;           // size of the state vector (redundant but handy, avoid calling .size() each time)
00065       PredictionEngineBase      *m_pred_engine;      // contains the local information state and a prediction model
00066       PredictModelFactoryBase   *m_pred_factory;     // prediction model generator (used for both the local filter and channel filters)
00067       CommFactoryBase           *m_comm_factory;     // communication object generator (used for channel filter creation)
00068       ChanFiltersBuf            m_chan_filters;      // array containing the channel filters
00069       time                      m_next_sync_horizon; // the channel filters sync will occur at this time
00070       time                      m_time_orig;         // for debug only (time origin)
00071       time                      m_sync_period;       // the interval of time between two channel filters synchronization
00072       time                      m_last_sync;         // time of the last synchronization
00073       time                      m_time_eps;          // epsilon for merging info with similar timestamps
00074       bool                      m_just_sync;         // tells if the sensor node has just synchronized
00075       bool                      m_log_chan;          // activate/de-activate channel filter log
00076       mutable boost::mutex      m_chan_mutex;        // channel filters can be accessed asynchronoulsy (creation and synchronizatioin)
00077       mutable boost::try_mutex  m_local_mutex;       // for the local filter (asynchronous access: measurements and channel filters)
00078       mutable boost::mutex      m_next_horiz_mutex;  // for m_next_sync_horizon
00079    
00080     public:
00081       SensorNodeBase(unsigned short sv_size, unsigned short id, time const& sync_period, PredictModelFactoryBase  *pred_factory, \
00082                   CommFactoryBase *comm_factory, bool delayed_data);
00083    
00084       virtual ~SensorNodeBase();
00085 
00086       bool IsJustSync() const { return m_just_sync; }
00087       void SetNotSync() { m_just_sync = false; }
00088       time GetLastSyncTime() const { return m_last_sync; }
00089       void SetLastSyncTime(time const& last) { m_last_sync = last; }
00090       void SetEpsilon(time const& epsilon) { m_time_eps = epsilon; }
00091       time GetEpsilon() const { return m_time_eps; }
00092 
00093       // Set/Get functions
00094       void SetNow(time const& t_orig);
00095       time const& GetNow() const { return m_time_orig; }
00096       unsigned short GetID() { return m_id; }
00097       unsigned short GetSvSize() { return m_sv_size; }
00098       time const& GetSyncPeriod() { return m_sync_period; }
00099       void SetLogChanFilters(bool log) { m_log_chan = log; }
00100       bool GetLogChanFilters() { return m_log_chan; }
00101 
00103       void SetInitWithState(VEC const& x, MSYM const& X) { m_pred_engine->InitState(x, X); }
00104 
00106       void SetInitWithInfo(VEC const& y, MSYM const& Y) { m_pred_engine->InitInformation(y, Y); }
00107       void SetNextHorizon(time const& horizon) { boost::mutex::scoped_lock lock(m_next_horiz_mutex); m_next_sync_horizon = horizon; }
00108       time const& GetNextHorizon() { boost::mutex::scoped_lock lock(m_next_horiz_mutex); return m_next_sync_horizon; }
00109 
00111       time const& GetLocalInfoTime() const { return m_pred_engine->GetCurrentStateTime(); }
00112 
00113       void SetLocalInfoTime(time const& t);
00114       InfoContainer const& GetLocalInfo() const { return m_pred_engine->GetPrediction(); }
00115       InfoContainer const& GetLocalState() const { return m_pred_engine->GetInversePrediction(); }
00116 
00117       void PrintPredModel();
00118    
00120       bool IsNodeConnected(unsigned short other_node);
00121    
00123       virtual SN_CODE_TYPE  RequestConnection(unsigned short node_id)=0;
00124 
00126       virtual void SetupChanMgrComm()=0;
00127       virtual void Listen()=0;
00128       virtual void CleanChanMgrComm()=0;
00129 
00131       virtual void PurgeFifo()=0;
00132 
00134       void ExecSyncThreadFunc(time const& horizon, time const& elapsed, int call_nr);
00135 
00137       CommBase & CreateChannelFilter(unsigned short link_with);
00138       void CreateChannelFilter(key_t chan_key, unsigned short link_with);
00139 
00140 
00141       //private:
00143       void UpdateWithChannelFilters(time const& horizon);
00144 
00145     protected:
00146       ChannelFilterBase   * GetChanFltbyNodeID(unsigned short node_id);
00147     };
00148 
00157     template<typename PARAMS>
00158     class SensorNodeGeneric : public SensorNodeBase
00159     {
00160       typedef typename std::deque<StoreiI<PARAMS> >  StoreiIBuffer;
00161    
00162       StoreiIBuffer         m_iI_fifo;   // store iI in case m_local_mutex is not available immediately, avoid blocking in FeedWithSensorData
00163       mutable boost::mutex  m_fifo_mutex;
00164 
00165     public:
00166       SensorNodeGeneric(unsigned short sv_size, unsigned short id, time const& sync_period, \
00167                     PredictModelFactoryBase *pred_factory, CommFactoryBase *comm_factory, bool delayed_data)
00168         : SensorNodeBase(sv_size, id, sync_period, pred_factory, comm_factory, delayed_data) { }
00169    
00170       virtual ~SensorNodeGeneric() { }
00171 
00172       void SetHistoryLength(time const& hlength) { (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->SetHistoBufLength(hlength);}
00173       time GetHistoryLength() { return (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->GetHistoryLength(); }
00174       void ResizeHistory() { (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->ResizeHistory(); }
00175       bool GetHistoryClosestInfo(time const&t, InfoContainer &iCont) {
00176         return (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->GetHistoryClosestInfo(t, iCont);
00177       }
00178 
00182       void FeedWithSensorData(InfoContainer const& iI, PARAMS & params)
00183       {
00184         try // to get the mutex
00185         {
00186           boost::try_mutex::scoped_try_lock  lock(m_local_mutex);
00187 
00188           if(IsJustSync()) // modify h because a sync just occured
00189           {
00190             time local_time = GetLocalInfoTime();
00191             if (iI.GetTime() > local_time)   params.h = iI.GetTime() - local_time;
00192             // else this is a delayed data -> will be fused in PurgeFifo
00193             SetNotSync();
00194           }
00195           PushInFifo(iI, params);
00196           PurgeFifo();
00197         }
00198         catch(boost::lock_error)
00199         {
00200           JFR_VDEBUG("Can't get mutex!\n");
00201           PushInFifo(iI, params);                  // then store info, process later
00202         }
00203       }
00204    
00205       // local filter mutex must be locked GetNextHorizon won't change during the execution of the procedure
00206       void PurgeFifo()
00207       {
00208         typename StoreiIBuffer::iterator it;
00209         std::ostringstream stream;
00210         PredictionEngineGeneric<PARAMS> *pPred;
00211         time   obs_time, local_time, next_horiz;
00212       
00213         boost::mutex::scoped_lock   lockfifo(m_fifo_mutex);
00214         if(m_iI_fifo.empty()) return;
00215 
00216         pPred = dynamic_cast< PredictionEngineGeneric<PARAMS> * >(m_pred_engine);
00217         next_horiz = GetNextHorizon();
00218       
00219         // loop until either the buffer is empty or the observation timestamp > next_horiz
00220         for(it = m_iI_fifo.begin(); it != m_iI_fifo.end(); it++)
00221         {
00222           obs_time = (*it).m_info_cntr.GetTime();     // observation time, sensed time
00223           local_time = pPred->GetCurrentStateTime();
00224 
00225           JFR_VDEBUG("obs time: " << obs_time << " next_horiz: " << next_horiz << " m_last_sync:" << m_last_sync);
00226 
00227           if(obs_time <= m_last_sync && m_last_sync != time::infty())//next_horiz)
00228           {
00229             stream << "* Delayed data -> obs_time < m_last_sync " << obs_time <<  " < " << m_last_sync;
00230             
00231             if (!pPred->InsertDelayedMeasure((*it).m_info_cntr, local_time))
00232               stream << " ... Pending";
00233             else
00234               stream << " ... Done";
00235 
00236             JFR_DEBUG(stream.str());
00237             m_iI_fifo.pop_front();
00238           }
00239           else if(obs_time.check_close(local_time, m_time_eps))
00240           {
00241             JFR_DEBUG("~= local time");
00242             if(local_time > obs_time)
00243               (*it).m_info_cntr.SetTime(local_time);
00244             else
00245               pPred->SetCurrentStateTime(obs_time, false);
00246 
00247             // Update filter
00248             pPred->UpdatePredictionWith((*it).m_info_cntr);
00249             m_iI_fifo.pop_front();
00250           }
00251           else
00252             if(obs_time.check_close(next_horiz, m_time_eps))
00253             {
00254               JFR_DEBUG("~= next_horizon");
00255               // if the observation time is close to the next time horizon then we consider the same time
00256               (*it).m_info_cntr.SetTime(next_horiz);
00257 
00258               // predict the current state to the next horizon
00259               JFR_PRECOND(pPred->GetCurrentStateTime() <= next_horiz, "bad horizon");
00260               pPred->SetModelParams(pPred->GetCurrentStateTime(), next_horiz);
00261               pPred->Predict(next_horiz);
00262 
00263               // update
00264               pPred->UpdatePredictionWith((*it).m_info_cntr);
00265               m_iI_fifo.pop_front();
00266             }
00267             else if(obs_time != next_horiz)
00268             {
00269               // normal predict/update step
00270               JFR_VDEBUG("Normal fusion");
00271               pPred->UpdateModel((*it).m_params);
00272 
00273               try
00274               {
00275                 pPred->Predict(obs_time);
00276               }
00277               catch(Bayesian_filter::Numeric_exception)
00278               {
00279                 JFR_DEBUG("--- not PD node " << GetID() << "\n");
00280                 throw;
00281               }
00282               pPred->UpdatePredictionWith((*it).m_info_cntr);
00283               m_iI_fifo.pop_front();
00284             }
00285             else if(obs_time > next_horiz)
00286             {
00287               // this data will be fused later
00288               JFR_DEBUG("obs_time > next_horiz");
00289               return;
00290             }
00291             else
00292             {
00293               JFR_WARNING("Case not supported\n");
00294               exit(EXIT_FAILURE);
00295             }
00296         }
00297       }
00298 
00299       void PushInFifo(InfoContainer const& iI, PARAMS & params)
00300       {
00301         boost::mutex::scoped_lock   lockfifo(m_fifo_mutex);
00302         StoreiI<PARAMS> st_iI(GetSvSize());
00303       
00304         st_iI.m_info_cntr = iI;
00305         st_iI.m_params = params;
00306         m_iI_fifo.push_back(st_iI);
00307       }
00308     };
00309 
00310 
00311   } // namespace ddf
00312 } // namespace jafar
00313 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on Wed Oct 15 2014 00:37:17 for Jafar by doxygen 1.7.6.1
LAAS-CNRS