BeeeOn Gateway  v2020.3.1-2-g6f737dc
Platform to interconnect the IoT world
QueuingExporter.h
1 #pragma once
2 
3 #include <deque>
4 
5 #include <Poco/Event.h>
6 #include <Poco/Mutex.h>
7 #include <Poco/SharedPtr.h>
8 
9 #include "core/Exporter.h"
10 #include "exporters/QueuingStrategy.h"
11 #include "model/SensorData.h"
12 #include "util/Loggable.h"
13 
14 namespace BeeeOn {
15 
45 class QueuingExporter : public Exporter, protected Loggable {
46 public:
47  typedef Poco::SharedPtr<QueuingExporter> Ptr;
48 
50  ~QueuingExporter() override;
51 
56  bool ship(const SensorData &data) override;
57 
58  void setStrategy(const QueuingStrategy::Ptr strategy);
59 
70  void setSaveThreshold(const int dataCount);
71 
78  void setSaveTimeout(const Poco::Timespan &timeout);
79 
87  void setStrategyPriority(const int percent);
88 
89 protected:
102  void acquire(
103  std::vector<SensorData> &data,
104  size_t count,
105  const Poco::Timespan &timeout);
106 
111  void ack();
112 
116  void reset();
117 
121  bool empty() const;
122 
123 private:
124  bool shouldSave() const;
125  size_t queueSize() const;
126 
127  bool waitNotEmpty(const Poco::Timespan &timeout);
128 
174  void mix(
175  std::vector<SensorData> &data,
176  size_t count,
177  size_t &acquired,
178  size_t &peeked);
179 
180  size_t mixFromBackup(
181  size_t requiredCount,
182  size_t queueDataCount,
183  size_t backupPriority,
184  double remainder);
185 
186  void updateRemaindersAfterPeek(
187  size_t peeked,
188  size_t requiredToPeek,
189  double potentialRemainder);
190 
191  size_t mixFromQueue(size_t toAcquire, size_t queueDataCount);
192 
193  void saveQueue(size_t skipFirst);
194  void doSaveQueue(size_t skipFirst);
195 
196 private:
197  mutable Poco::Mutex m_queueMutex;
198 
199  QueuingStrategy::Ptr m_strategy;
200 
201  Poco::Event m_notEmpty;
202 
203  uint32_t m_backupPriority;
204  size_t m_saveThreshold;
205  Poco::Timespan m_saveTimeout;
206 
207  size_t m_acquiredDataCount;
208  size_t m_peekedDataCount;
209  Poco::Timestamp m_lastExport;
210  std::deque<SensorData> m_queue;
211 
212  bool m_acked;
213  double m_mixRemainder;
214  double m_previousMixRemainder;
215 };
216 
217 }
void reset()
Definition: QueuingExporter.cpp:230
Definition: SensorData.h:20
void setStrategyPriority(const int percent)
Definition: QueuingExporter.cpp:51
Definition: Exporter.h:11
bool empty() const
Definition: QueuingExporter.cpp:59
Implements Exporter interface and provides SensorData prevents SensorData loss.
Definition: QueuingExporter.h:45
void ack()
Definition: QueuingExporter.cpp:213
void acquire(std::vector< SensorData > &data, size_t count, const Poco::Timespan &timeout)
Definition: QueuingExporter.cpp:125
bool ship(const SensorData &data) override
Definition: QueuingExporter.cpp:107
void setSaveThreshold(const int dataCount)
Definition: QueuingExporter.cpp:35
Definition: Loggable.h:19
void setSaveTimeout(const Poco::Timespan &timeout)
Definition: QueuingExporter.cpp:43