7 #include <Poco/DigestEngine.h>
10 #include <Poco/Timespan.h>
11 #include <Poco/Timestamp.h>
13 #include "exporters/QueuingStrategy.h"
14 #include "util/Journal.h"
15 #include "util/Loggable.h"
99 bool empty()
override;
106 void push(
const std::vector<SensorData> &data)
override;
117 size_t peek(std::vector<SensorData> &data,
size_t count)
override;
123 void pop(
size_t count)
override;
126 typedef std::function<void(
127 const std::string &name,
129 Poco::Timestamp &newest)> BrokenHandler;
148 void prescanBuffers(Poco::Timestamp &newest, BrokenHandler broken);
153 void reportStats(
const Poco::Timestamp &newest)
const;
159 const std::string &name,
161 Poco::Timestamp &newest);
166 Journal::Ptr
index()
const;
172 const std::string &data,
173 bool force =
true)
const;
179 bool whipeFile(Poco::File file,
bool usuallyFails =
false)
const;
214 Poco::Path
pathTo(
const std::string &name)
const;
219 static std::string
tsString(
const Poco::Timestamp &t);
231 const std::string &buffer,
232 const size_t nextOffset);
235 std::string buffer()
const;
236 size_t nextOffset()
const;
240 std::string m_buffer;
260 std::function<
void(
const Entry &entry)> proc,
268 Poco::Timestamp oldest = Poco::Timestamp::TIMEVAL_MAX;
269 Poco::Timestamp newest = Poco::Timestamp::TIMEVAL_MIN;
275 void update(
const Poco::Timestamp ×tamp);
285 const Poco::Path &path,
289 std::string name()
const;
290 Poco::Path path()
const;
291 size_t offset()
const;
306 std::function<
void(
const Entry &entry)> proc);
314 std::function<
void(
const Entry &entry)> proc,
320 void inspectAndVerify(
321 const Poco::DigestEngine::Digest &digest,
329 const std::vector<SensorData> &data);
340 std::function<
void(
const Entry &entry)> proc,
342 const size_t count)
const;
352 std::function<
void(
const Entry &entry)> proc,
354 const size_t count)
const;
371 Poco::Path m_rootDir;
373 bool m_neverDropOldest;
374 ssize_t m_bytesLimit;
375 bool m_ignoreIndexErrors;
376 Journal::Ptr m_index;
382 std::list<FileBuffer> m_buffers;
389 std::map<std::string, FileBuffer> m_exhausted;
394 std::list<Entry> m_entryCache;
size_t bytesUsedAll() const
Definition: JournalQueuingStrategy.cpp:666
Journal::Ptr index() const
Definition: JournalQueuingStrategy.cpp:303
size_t readEntries(std::function< void(const Entry &entry)> proc)
Read all entries from the current offset. The offset is updated to point after the read data (even in...
Helper struct with statistics collected during an inspection of a FileBuffer instance.
Definition: JournalQueuingStrategy.h:267
void pop(size_t count) override
Pop the count amount of data off the registered buffers. It updates index accordingly.
Definition: JournalQueuingStrategy.cpp:418
Definition: SensorData.h:20
void setNeverDropOldest(bool neverDrop)
Disable dropping of oldest data. This is useful for debugging or in case of a serious bug in the drop...
Definition: JournalQueuingStrategy.cpp:65
void dropOldestBuffers(const size_t bytes)
Drop oldest valid data to ensure that at least the given bytes amount of space is available for writi...
Definition: JournalQueuingStrategy.cpp:558
static std::string formatEntries(const std::vector< SensorData > &data)
Format the given data into the form as expected by the readEntries() method.
Definition: JournalQueuingStrategy.cpp:854
void setRootDir(const std::string &path)
Set the root directory where to create or use a storage.
Definition: JournalQueuingStrategy.cpp:50
An instance of Entry represents a single record in the FileBuffer. Such record contains a single Sens...
Definition: JournalQueuingStrategy.h:227
bool whipeFile(Poco::File file, bool usuallyFails=false) const
Remove the given file if possible. If usuallyFails is true, than the method does not log errors...
Definition: JournalQueuingStrategy.cpp:220
void setIgnoreIndexErrors(bool ignore)
Configure behaviour of index loading. If the index contains broken entries, we can either fail quickl...
Definition: JournalQueuingStrategy.cpp:84
size_t precacheEntries(size_t count)
The call ensures that there are up to count entries into the m_entryCache. If there is not enough dat...
Definition: JournalQueuingStrategy.cpp:352
bool garbageCollect(const size_t bytes)
Perform garbage collection to ensure that at least the given bytes amount of space is available for w...
Definition: JournalQueuingStrategy.cpp:481
size_t scanEntries(size_t offset, std::function< void(const Entry &entry)> proc, size_t &bytes, const size_t count) const
Scan for up to count entries from the current offset. The parameter bytes is updated continuously whi...
void initIndex(const Poco::Path &index)
Initialize the journaling index either by creating a new empty one or by loading the existing one...
Definition: JournalQueuingStrategy.cpp:89
size_t peek(std::vector< SensorData > &data, size_t count) override
Peek the given count of data off the registered buffers starting from the oldest one. Calling this method is stable (returns the same results) until the pop() method is called. Index is not affected by this call.
Definition: JournalQueuingStrategy.cpp:378
void inspectAndRegisterBuffer(const std::string &name, size_t offset, Poco::Timestamp &newest)
Inspect buffer of the given name.
Definition: JournalQueuingStrategy.cpp:108
bool exhausted() const
Definition: JournalQueuingStrategy.cpp:774
void setBytesLimit(int bytes)
Set top limit for data consumed by the strategy in the filesystem. This counts all data files (also d...
Definition: JournalQueuingStrategy.cpp:70
void reportStats(const Poco::Timestamp &newest) const
Report statistics about buffers.
Definition: JournalQueuingStrategy.cpp:211
void prescanBuffers(Poco::Timestamp &newest, BrokenHandler broken)
Pre-scan all buffers in the index, check their consistency, collect some information (entries counts...
Definition: JournalQueuingStrategy.cpp:140
JournalQueuingStrategy implements persistent temporary storing of SensorData into a filesystem struct...
Definition: JournalQueuingStrategy.h:40
size_t bytesUsed() const
Definition: JournalQueuingStrategy.cpp:644
size_t readEntries(std::function< void(const Entry &entry)> proc, size_t count)
Read up to count entries sequentially from buffers. For each entry, call the given method proc...
Definition: JournalQueuingStrategy.cpp:318
Definition: Loggable.h:19
void collectReferenced(std::set< std::string > &referenced) const
Collect all referenced buffers. Such buffers must not be deleted and should not be affected in anyway...
Definition: JournalQueuingStrategy.cpp:473
Representation of a persistent file buffer that contains entries holding the stored SensorData...
Definition: JournalQueuingStrategy.h:282
Poco::Path pathTo(const std::string &name) const
Definition: JournalQueuingStrategy.cpp:702
void push(const std::vector< SensorData > &data) override
Push the given data into a separate buffer and update the index accordingly. In case of serious failu...
Definition: JournalQueuingStrategy.cpp:308
std::string writeData(const std::string &data, bool force=true) const
Write data safely to a file and return its name.
Definition: JournalQueuingStrategy.cpp:272
Definition: QueuingStrategy.h:18
bool overLimit(size_t bytes) const
Definition: JournalQueuingStrategy.cpp:75
void registerBuffer(const FileBuffer &buffer, const FileBufferStat &stat)
Register the given buffer with the strategy. The index is not updated by this call.
Definition: JournalQueuingStrategy.cpp:246
void setDisableGC(bool disable)
Disable garbage-collection entirely. This is useful for debugging or in case of a serious bug in the ...
Definition: JournalQueuingStrategy.cpp:60
Poco::Timestamp initIndexAndScan(BrokenHandler broken)
Performs all the necessary steps done when calling setup(). The given function is called when a broke...
Definition: JournalQueuingStrategy.cpp:193
static std::string tsString(const Poco::Timestamp &t)
Definition: JournalQueuingStrategy.cpp:707
Poco::Path rootDir() const
Definition: JournalQueuingStrategy.cpp:55
virtual void setup()
Setup the storage for the JournalQueuingStrategy. It creates new index or loads the existing one...
Definition: JournalQueuingStrategy.cpp:181
bool empty() override
The call might call precacheEntries() to load up to 1 entry.
Definition: JournalQueuingStrategy.cpp:370