00001
00002 #ifndef DDF_SENSORNODEGENERIC_HPP
00003 #define DDF_SENSORNODEGENERIC_HPP
00004
00005 #include "BayesFilter/bayesFlt.hpp"
00006
00007 #include "predictenginebase.hpp"
00008 #include "predictmodelfactory.hpp"
00009 #include "commfactory.hpp"
00010 #include "infocontainer.hpp"
00011 #include "periodicthread.hpp"
00012 #include "channelfilterbase.hpp"
00013 #include "definitions.hpp"
00014 #include "ctime.hpp"
00015 #include "debugging.hpp"
00016
00017 namespace jafar
00018 {
00019 namespace ddf
00020 {
00021
00022
00023
00024
00025
00026
00027
00028
00029 typedef enum {SN_CONNECT=1, SN_DISCONNECT, SN_CONFIRM_CONNECTION, SN_CONFIRM_DECONNECTION} SN_CONNECT_TYPE;
00030 typedef enum {SN_CODE_VOID, SN_CODE_OK, SN_CODE_ALREADY_CONNECTED} SN_CODE_TYPE;
00031 typedef enum {SN_RET_OK,SN_RET_NOT_READY,SN_RET_FAIL} SN_RET_TYPE;
00032
00033 const char SN_CONNECT_STRING[][30] = {"SN_VOID","Request connect","Request disconnect","Confirm connect", "Confirm disconnect"};
00034 const char SN_CODE_STRING[][30] = {"SN_CODE_VOID","ok","node already connected"};
00035
00036
00037 template<typename PARAMS>
00038 class StoreiI
00039 {
00040 public:
00041 InfoContainer m_info_cntr;
00042 PARAMS m_params;
00043
00044 StoreiI(unsigned short size) : m_info_cntr(size) {}
00045 ~StoreiI(){}
00046 };
00047
00048
00057 class SensorNodeBase
00058 {
00059 protected:
00060 typedef std::list<ChannelFilterBase *> ChanFiltersBuf;
00061 static unsigned short m_static_id;
00062
00063 unsigned short m_id;
00064 unsigned short m_sv_size;
00065 PredictionEngineBase *m_pred_engine;
00066 PredictModelFactoryBase *m_pred_factory;
00067 CommFactoryBase *m_comm_factory;
00068 ChanFiltersBuf m_chan_filters;
00069 time m_next_sync_horizon;
00070 time m_time_orig;
00071 time m_sync_period;
00072 time m_last_sync;
00073 time m_time_eps;
00074 bool m_just_sync;
00075 bool m_log_chan;
00076 mutable boost::mutex m_chan_mutex;
00077 mutable boost::try_mutex m_local_mutex;
00078 mutable boost::mutex m_next_horiz_mutex;
00079
00080 public:
00081 SensorNodeBase(unsigned short sv_size, unsigned short id, time const& sync_period, PredictModelFactoryBase *pred_factory, \
00082 CommFactoryBase *comm_factory, bool delayed_data);
00083
00084 virtual ~SensorNodeBase();
00085
00086 bool IsJustSync() const { return m_just_sync; }
00087 void SetNotSync() { m_just_sync = false; }
00088 time GetLastSyncTime() const { return m_last_sync; }
00089 void SetLastSyncTime(time const& last) { m_last_sync = last; }
00090 void SetEpsilon(time const& epsilon) { m_time_eps = epsilon; }
00091 time GetEpsilon() const { return m_time_eps; }
00092
00093
00094 void SetNow(time const& t_orig);
00095 time const& GetNow() const { return m_time_orig; }
00096 unsigned short GetID() { return m_id; }
00097 unsigned short GetSvSize() { return m_sv_size; }
00098 time const& GetSyncPeriod() { return m_sync_period; }
00099 void SetLogChanFilters(bool log) { m_log_chan = log; }
00100 bool GetLogChanFilters() { return m_log_chan; }
00101
00103 void SetInitWithState(VEC const& x, MSYM const& X) { m_pred_engine->InitState(x, X); }
00104
00106 void SetInitWithInfo(VEC const& y, MSYM const& Y) { m_pred_engine->InitInformation(y, Y); }
00107 void SetNextHorizon(time const& horizon) { boost::mutex::scoped_lock lock(m_next_horiz_mutex); m_next_sync_horizon = horizon; }
00108 time const& GetNextHorizon() { boost::mutex::scoped_lock lock(m_next_horiz_mutex); return m_next_sync_horizon; }
00109
00111 time const& GetLocalInfoTime() const { return m_pred_engine->GetCurrentStateTime(); }
00112
00113 void SetLocalInfoTime(time const& t);
00114 InfoContainer const& GetLocalInfo() const { return m_pred_engine->GetPrediction(); }
00115 InfoContainer const& GetLocalState() const { return m_pred_engine->GetInversePrediction(); }
00116
00117 void PrintPredModel();
00118
00120 bool IsNodeConnected(unsigned short other_node);
00121
00123 virtual SN_CODE_TYPE RequestConnection(unsigned short node_id)=0;
00124
00126 virtual void SetupChanMgrComm()=0;
00127 virtual void Listen()=0;
00128 virtual void CleanChanMgrComm()=0;
00129
00131 virtual void PurgeFifo()=0;
00132
00134 void ExecSyncThreadFunc(time const& horizon, time const& elapsed, int call_nr);
00135
00137 CommBase & CreateChannelFilter(unsigned short link_with);
00138 void CreateChannelFilter(key_t chan_key, unsigned short link_with);
00139
00140
00141
00143 void UpdateWithChannelFilters(time const& horizon);
00144
00145 protected:
00146 ChannelFilterBase * GetChanFltbyNodeID(unsigned short node_id);
00147 };
00148
00157 template<typename PARAMS>
00158 class SensorNodeGeneric : public SensorNodeBase
00159 {
00160 typedef typename std::deque<StoreiI<PARAMS> > StoreiIBuffer;
00161
00162 StoreiIBuffer m_iI_fifo;
00163 mutable boost::mutex m_fifo_mutex;
00164
00165 public:
00166 SensorNodeGeneric(unsigned short sv_size, unsigned short id, time const& sync_period, \
00167 PredictModelFactoryBase *pred_factory, CommFactoryBase *comm_factory, bool delayed_data)
00168 : SensorNodeBase(sv_size, id, sync_period, pred_factory, comm_factory, delayed_data) { }
00169
00170 virtual ~SensorNodeGeneric() { }
00171
00172 void SetHistoryLength(time const& hlength) { (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->SetHistoBufLength(hlength);}
00173 time GetHistoryLength() { return (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->GetHistoryLength(); }
00174 void ResizeHistory() { (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->ResizeHistory(); }
00175 bool GetHistoryClosestInfo(time const&t, InfoContainer &iCont) {
00176 return (dynamic_cast<PredictionEngineGeneric<PARAMS> * >(m_pred_engine))->GetHistoryClosestInfo(t, iCont);
00177 }
00178
00182 void FeedWithSensorData(InfoContainer const& iI, PARAMS & params)
00183 {
00184 try
00185 {
00186 boost::try_mutex::scoped_try_lock lock(m_local_mutex);
00187
00188 if(IsJustSync())
00189 {
00190 time local_time = GetLocalInfoTime();
00191 if (iI.GetTime() > local_time) params.h = iI.GetTime() - local_time;
00192
00193 SetNotSync();
00194 }
00195 PushInFifo(iI, params);
00196 PurgeFifo();
00197 }
00198 catch(boost::lock_error)
00199 {
00200 JFR_VDEBUG("Can't get mutex!\n");
00201 PushInFifo(iI, params);
00202 }
00203 }
00204
00205
00206 void PurgeFifo()
00207 {
00208 typename StoreiIBuffer::iterator it;
00209 std::ostringstream stream;
00210 PredictionEngineGeneric<PARAMS> *pPred;
00211 time obs_time, local_time, next_horiz;
00212
00213 boost::mutex::scoped_lock lockfifo(m_fifo_mutex);
00214 if(m_iI_fifo.empty()) return;
00215
00216 pPred = dynamic_cast< PredictionEngineGeneric<PARAMS> * >(m_pred_engine);
00217 next_horiz = GetNextHorizon();
00218
00219
00220 for(it = m_iI_fifo.begin(); it != m_iI_fifo.end(); it++)
00221 {
00222 obs_time = (*it).m_info_cntr.GetTime();
00223 local_time = pPred->GetCurrentStateTime();
00224
00225 JFR_VDEBUG("obs time: " << obs_time << " next_horiz: " << next_horiz << " m_last_sync:" << m_last_sync);
00226
00227 if(obs_time <= m_last_sync && m_last_sync != time::infty())
00228 {
00229 stream << "* Delayed data -> obs_time < m_last_sync " << obs_time << " < " << m_last_sync;
00230
00231 if (!pPred->InsertDelayedMeasure((*it).m_info_cntr, local_time))
00232 stream << " ... Pending";
00233 else
00234 stream << " ... Done";
00235
00236 JFR_DEBUG(stream.str());
00237 m_iI_fifo.pop_front();
00238 }
00239 else if(obs_time.check_close(local_time, m_time_eps))
00240 {
00241 JFR_DEBUG("~= local time");
00242 if(local_time > obs_time)
00243 (*it).m_info_cntr.SetTime(local_time);
00244 else
00245 pPred->SetCurrentStateTime(obs_time, false);
00246
00247
00248 pPred->UpdatePredictionWith((*it).m_info_cntr);
00249 m_iI_fifo.pop_front();
00250 }
00251 else
00252 if(obs_time.check_close(next_horiz, m_time_eps))
00253 {
00254 JFR_DEBUG("~= next_horizon");
00255
00256 (*it).m_info_cntr.SetTime(next_horiz);
00257
00258
00259 JFR_PRECOND(pPred->GetCurrentStateTime() <= next_horiz, "bad horizon");
00260 pPred->SetModelParams(pPred->GetCurrentStateTime(), next_horiz);
00261 pPred->Predict(next_horiz);
00262
00263
00264 pPred->UpdatePredictionWith((*it).m_info_cntr);
00265 m_iI_fifo.pop_front();
00266 }
00267 else if(obs_time != next_horiz)
00268 {
00269
00270 JFR_VDEBUG("Normal fusion");
00271 pPred->UpdateModel((*it).m_params);
00272
00273 try
00274 {
00275 pPred->Predict(obs_time);
00276 }
00277 catch(Bayesian_filter::Numeric_exception)
00278 {
00279 JFR_DEBUG("--- not PD node " << GetID() << "\n");
00280 throw;
00281 }
00282 pPred->UpdatePredictionWith((*it).m_info_cntr);
00283 m_iI_fifo.pop_front();
00284 }
00285 else if(obs_time > next_horiz)
00286 {
00287
00288 JFR_DEBUG("obs_time > next_horiz");
00289 return;
00290 }
00291 else
00292 {
00293 JFR_WARNING("Case not supported\n");
00294 exit(EXIT_FAILURE);
00295 }
00296 }
00297 }
00298
00299 void PushInFifo(InfoContainer const& iI, PARAMS & params)
00300 {
00301 boost::mutex::scoped_lock lockfifo(m_fifo_mutex);
00302 StoreiI<PARAMS> st_iI(GetSvSize());
00303
00304 st_iI.m_info_cntr = iI;
00305 st_iI.m_params = params;
00306 m_iI_fifo.push_back(st_iI);
00307 }
00308 };
00309
00310
00311 }
00312 }
00313 #endif