BeeeOn Gateway  v2020.3.1-2-g6f737dc
Platform to interconnect the IoT world
JournalQueuingStrategy.h
1 #pragma once
2 
3 #include <functional>
4 #include <list>
5 #include <map>
6 
7 #include <Poco/DigestEngine.h>
8 #include <Poco/File.h>
9 #include <Poco/Path.h>
10 #include <Poco/Timespan.h>
11 #include <Poco/Timestamp.h>
12 
13 #include "exporters/QueuingStrategy.h"
14 #include "util/Journal.h"
15 #include "util/Loggable.h"
16 
17 namespace BeeeOn {
18 
40 class JournalQueuingStrategy : public QueuingStrategy, protected Loggable {
41 public:
43 
47  void setRootDir(const std::string &path);
48 
52  Poco::Path rootDir() const;
53 
58  void setDisableGC(bool disable);
59 
64  void setNeverDropOldest(bool neverDrop);
65 
71  void setBytesLimit(int bytes);
72 
77  bool overLimit(size_t bytes) const;
78 
86  void setIgnoreIndexErrors(bool ignore);
87 
93  virtual void setup();
94 
99  bool empty() override;
100 
106  void push(const std::vector<SensorData> &data) override;
107 
117  size_t peek(std::vector<SensorData> &data, size_t count) override;
118 
123  void pop(size_t count) override;
124 
125 protected:
126  typedef std::function<void(
127  const std::string &name,
128  size_t offset,
129  Poco::Timestamp &newest)> BrokenHandler;
135  Poco::Timestamp initIndexAndScan(BrokenHandler broken);
136 
141  void initIndex(const Poco::Path &index);
142 
148  void prescanBuffers(Poco::Timestamp &newest, BrokenHandler broken);
149 
153  void reportStats(const Poco::Timestamp &newest) const;
154 
159  const std::string &name,
160  size_t offset,
161  Poco::Timestamp &newest);
162 
166  Journal::Ptr index() const;
167 
171  std::string writeData(
172  const std::string &data,
173  bool force = true) const;
174 
179  bool whipeFile(Poco::File file, bool usuallyFails = false) const;
180 
186  void collectReferenced(std::set<std::string> &referenced) const;
187 
192  bool garbageCollect(const size_t bytes);
193 
198  void dropOldestBuffers(const size_t bytes);
199 
203  size_t bytesUsed() const;
204 
209  size_t bytesUsedAll() const;
210 
214  Poco::Path pathTo(const std::string &name) const;
215 
219  static std::string tsString(const Poco::Timestamp &t);
220 
227  class Entry {
228  public:
229  Entry(
230  const SensorData &data,
231  const std::string &buffer,
232  const size_t nextOffset);
233 
234  const SensorData &data() const;
235  std::string buffer() const;
236  size_t nextOffset() const;
237 
238  private:
239  SensorData m_data;
240  std::string m_buffer;
241  size_t m_nextOffset;
242  };
243 
252  size_t precacheEntries(size_t count);
253 
259  size_t readEntries(
260  std::function<void(const Entry &entry)> proc,
261  size_t count);
262 
267  struct FileBufferStat {
268  Poco::Timestamp oldest = Poco::Timestamp::TIMEVAL_MAX;
269  Poco::Timestamp newest = Poco::Timestamp::TIMEVAL_MIN;
270  size_t bytes = 0;
271  size_t offset = 0;
272  size_t broken = 0;
273  size_t count = 0;
274 
275  void update(const Poco::Timestamp &timestamp);
276  };
277 
282  class FileBuffer {
283  public:
284  FileBuffer(
285  const Poco::Path &path,
286  size_t offset,
287  size_t size);
288 
289  std::string name() const;
290  Poco::Path path() const;
291  size_t offset() const;
292  size_t size() const;
293 
298  bool exhausted() const;
299 
305  size_t readEntries(
306  std::function<void(const Entry &entry)> proc);
307 
313  size_t readEntries(
314  std::function<void(const Entry &entry)> proc,
315  const size_t count);
316 
320  void inspectAndVerify(
321  const Poco::DigestEngine::Digest &digest,
322  FileBufferStat &stat) const;
323 
328  static std::string formatEntries(
329  const std::vector<SensorData> &data);
330 
331  protected:
338  size_t scanEntries(
339  size_t offset,
340  std::function<void(const Entry &entry)> proc,
341  size_t &bytes,
342  const size_t count) const;
343 
350  size_t scanEntries(
351  std::istream &in,
352  std::function<void(const Entry &entry)> proc,
353  size_t &bytes,
354  const size_t count) const;
355 
356  private:
357  Poco::Path m_path;
358  size_t m_offset;
359  size_t m_size;
360  };
361 
366  void registerBuffer(
367  const FileBuffer &buffer,
368  const FileBufferStat &stat);
369 
370 private:
371  Poco::Path m_rootDir;
372  bool m_gcDisabled;
373  bool m_neverDropOldest;
374  ssize_t m_bytesLimit;
375  bool m_ignoreIndexErrors;
376  Journal::Ptr m_index;
377 
382  std::list<FileBuffer> m_buffers;
383 
389  std::map<std::string, FileBuffer> m_exhausted;
390 
394  std::list<Entry> m_entryCache;
395 };
396 
397 }
size_t bytesUsedAll() const
Definition: JournalQueuingStrategy.cpp:666
Journal::Ptr index() const
Definition: JournalQueuingStrategy.cpp:303
size_t readEntries(std::function< void(const Entry &entry)> proc)
Read all entries from the current offset. The offset is updated to point after the read data (even in...
Helper struct with statistics collected during an inspection of a FileBuffer instance.
Definition: JournalQueuingStrategy.h:267
void pop(size_t count) override
Pop the count amount of data off the registered buffers. It updates index accordingly.
Definition: JournalQueuingStrategy.cpp:418
Definition: SensorData.h:20
void setNeverDropOldest(bool neverDrop)
Disable dropping of oldest data. This is useful for debugging or in case of a serious bug in the drop...
Definition: JournalQueuingStrategy.cpp:65
void dropOldestBuffers(const size_t bytes)
Drop oldest valid data to ensure that at least the given bytes amount of space is available for writi...
Definition: JournalQueuingStrategy.cpp:558
static std::string formatEntries(const std::vector< SensorData > &data)
Format the given data into the form as expected by the readEntries() method.
Definition: JournalQueuingStrategy.cpp:854
void setRootDir(const std::string &path)
Set the root directory where to create or use a storage.
Definition: JournalQueuingStrategy.cpp:50
An instance of Entry represents a single record in the FileBuffer. Such record contains a single Sens...
Definition: JournalQueuingStrategy.h:227
bool whipeFile(Poco::File file, bool usuallyFails=false) const
Remove the given file if possible. If usuallyFails is true, than the method does not log errors...
Definition: JournalQueuingStrategy.cpp:220
void setIgnoreIndexErrors(bool ignore)
Configure behaviour of index loading. If the index contains broken entries, we can either fail quickl...
Definition: JournalQueuingStrategy.cpp:84
size_t precacheEntries(size_t count)
The call ensures that there are up to count entries into the m_entryCache. If there is not enough dat...
Definition: JournalQueuingStrategy.cpp:352
bool garbageCollect(const size_t bytes)
Perform garbage collection to ensure that at least the given bytes amount of space is available for w...
Definition: JournalQueuingStrategy.cpp:481
size_t scanEntries(size_t offset, std::function< void(const Entry &entry)> proc, size_t &bytes, const size_t count) const
Scan for up to count entries from the current offset. The parameter bytes is updated continuously whi...
void initIndex(const Poco::Path &index)
Initialize the journaling index either by creating a new empty one or by loading the existing one...
Definition: JournalQueuingStrategy.cpp:89
size_t peek(std::vector< SensorData > &data, size_t count) override
Peek the given count of data off the registered buffers starting from the oldest one. Calling this method is stable (returns the same results) until the pop() method is called. Index is not affected by this call.
Definition: JournalQueuingStrategy.cpp:378
void inspectAndRegisterBuffer(const std::string &name, size_t offset, Poco::Timestamp &newest)
Inspect buffer of the given name.
Definition: JournalQueuingStrategy.cpp:108
bool exhausted() const
Definition: JournalQueuingStrategy.cpp:774
void setBytesLimit(int bytes)
Set top limit for data consumed by the strategy in the filesystem. This counts all data files (also d...
Definition: JournalQueuingStrategy.cpp:70
void reportStats(const Poco::Timestamp &newest) const
Report statistics about buffers.
Definition: JournalQueuingStrategy.cpp:211
void prescanBuffers(Poco::Timestamp &newest, BrokenHandler broken)
Pre-scan all buffers in the index, check their consistency, collect some information (entries counts...
Definition: JournalQueuingStrategy.cpp:140
JournalQueuingStrategy implements persistent temporary storing of SensorData into a filesystem struct...
Definition: JournalQueuingStrategy.h:40
size_t bytesUsed() const
Definition: JournalQueuingStrategy.cpp:644
size_t readEntries(std::function< void(const Entry &entry)> proc, size_t count)
Read up to count entries sequentially from buffers. For each entry, call the given method proc...
Definition: JournalQueuingStrategy.cpp:318
Definition: Loggable.h:19
void collectReferenced(std::set< std::string > &referenced) const
Collect all referenced buffers. Such buffers must not be deleted and should not be affected in anyway...
Definition: JournalQueuingStrategy.cpp:473
Representation of a persistent file buffer that contains entries holding the stored SensorData...
Definition: JournalQueuingStrategy.h:282
Poco::Path pathTo(const std::string &name) const
Definition: JournalQueuingStrategy.cpp:702
void push(const std::vector< SensorData > &data) override
Push the given data into a separate buffer and update the index accordingly. In case of serious failu...
Definition: JournalQueuingStrategy.cpp:308
std::string writeData(const std::string &data, bool force=true) const
Write data safely to a file and return its name.
Definition: JournalQueuingStrategy.cpp:272
Definition: QueuingStrategy.h:18
bool overLimit(size_t bytes) const
Definition: JournalQueuingStrategy.cpp:75
void registerBuffer(const FileBuffer &buffer, const FileBufferStat &stat)
Register the given buffer with the strategy. The index is not updated by this call.
Definition: JournalQueuingStrategy.cpp:246
void setDisableGC(bool disable)
Disable garbage-collection entirely. This is useful for debugging or in case of a serious bug in the ...
Definition: JournalQueuingStrategy.cpp:60
Poco::Timestamp initIndexAndScan(BrokenHandler broken)
Performs all the necessary steps done when calling setup(). The given function is called when a broke...
Definition: JournalQueuingStrategy.cpp:193
static std::string tsString(const Poco::Timestamp &t)
Definition: JournalQueuingStrategy.cpp:707
Poco::Path rootDir() const
Definition: JournalQueuingStrategy.cpp:55
virtual void setup()
Setup the storage for the JournalQueuingStrategy. It creates new index or loads the existing one...
Definition: JournalQueuingStrategy.cpp:181
bool empty() override
The call might call precacheEntries() to load up to 1 entry.
Definition: JournalQueuingStrategy.cpp:370