Comparar commits

...

2 Commits

Autor SHA1 Mensagem Data
Piotr M 6a8e718d4a add scheduling by modification time 2016-12-15 15:25:01 +01:00
Piotr M 353cd16303 Add folder items scheduler by specific predicate - item sizes ascending 2016-12-05 17:55:55 +01:00
8 arquivos alterados com 182 adições e 43 exclusões
+60 -24
Ver Arquivo
@@ -401,9 +401,16 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
}
foreach(PropagatorJob* it, directoriesToRemove) {
// ensure that these items will go last in the root folder and append
it->setLastOutJobPriority();
_rootJob->append(it);
}
// at this point, all priority values for items in the folders should be set
// update priority attribute for folders containing these items
// this will move _containerJobs to _subJobs updating their evaluation attributes
_rootJob->updateJobPriorityAttributeValues();
connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(_rootJob.data(), SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
@@ -574,20 +581,52 @@ OwncloudPropagator::DiskSpaceResult OwncloudPropagator::diskSpaceCheck() const
// ================================================================================
void PropagateDirectory::insertItemByPriority(PropagatorJob *subJob){
// if job is prioritised, take priority into account, otherwise use modification time
int priority = subJob->getJobPriorityAttributeValue();
if(subJob->_priority == JobPriority::NormalPriority){
// jobs are prioritised by timestamp, thus higher timestamp, higher the priority
// QMultiMap is sorted in increasing order, so substract from max possible quint64
int modificationTimestamp = subJob->getJobPriorityAttributeValue();
priority = std::numeric_limits<quint64>::max() - modificationTimestamp;
}
_subJobs.insertMulti(priority, subJob);
}
void PropagateDirectory::updateJobPriorityAttributeValues()
{
// This uses recursion to perform Depth-First Traversal of the directories with changes trees
// If the given (this) directory contains _containerJobs, it will call updateJob on that child dir job, otherwise does nothing
//this will get the _container jobs within this parent directory and iterate over them
QMutableVectorIterator<PropagatorJob *> containerJobsIterator(_containerJobs);
while (containerJobsIterator.hasNext()) {
//take next Container Job
PropagatorJob * subJob = containerJobsIterator.next();
//ascend to its child folders to update their priorities
subJob->updateJobPriorityAttributeValues();
//at this point, all child folders are updated, add job to _subJobs and remote job from _containerJobs
insertItemByPriority(subJob);
containerJobsIterator.remove();
}
}
PropagatorJob::JobParallelism PropagateDirectory::parallelism()
{
// If any of the non-finished sub jobs is not parallel, we have to wait
// FIXME! we should probably cache this result
if (_firstJob && _firstJob->_state != Finished) {
if (_firstJob->parallelism() != FullParallelism)
return WaitForFinished;
}
// FIXME: use the cached value of finished job
for (int i = 0; i < _subJobs.count(); ++i) {
if (_subJobs.at(i)->_state != Finished && _subJobs.at(i)->parallelism() != FullParallelism) {
QMapIterator<quint64, PropagatorJob *> subJobsIterator(_subJobs);
while (subJobsIterator.hasNext()) {
subJobsIterator.next();
if (subJobsIterator.value()->_state != Finished && subJobsIterator.value()->parallelism() != FullParallelism) {
return WaitForFinished;
}
}
@@ -604,6 +643,11 @@ bool PropagateDirectory::scheduleNextJob()
if (_state == NotYetStarted) {
_state = Running;
// at the begining of the Directory Job, update expected number of Jobs to be synced
_totalJobs = _subJobs.count();
if (_firstJob)
_totalJobs++;
if (!_firstJob && _subJobs.isEmpty()) {
finalize();
return true;
@@ -618,30 +662,27 @@ bool PropagateDirectory::scheduleNextJob()
return false;
}
// cache the value of first unfinished subjob
bool stopAtDirectory = false;
int i = _firstUnfinishedSubJob;
int subJobsCount = _subJobs.count();
while (i < subJobsCount && _subJobs.at(i)->_state == Finished) {
_firstUnfinishedSubJob = ++i;
}
for (int i = _firstUnfinishedSubJob; i < subJobsCount; ++i) {
if (_subJobs.at(i)->_state == Finished) {
QMutableMapIterator<quint64, PropagatorJob *> subJobsIterator(_subJobs);
while (subJobsIterator.hasNext()) {
subJobsIterator.next();
if (subJobsIterator.value()->_state == Finished) {
subJobsIterator.remove();
continue;
}
if (stopAtDirectory && qobject_cast<PropagateDirectory*>(_subJobs.at(i))) {
if (stopAtDirectory && qobject_cast<PropagateDirectory*>(subJobsIterator.value())) {
return false;
}
if (possiblyRunNextJob(_subJobs.at(i))) {
if (possiblyRunNextJob(subJobsIterator.value())) {
return true;
}
Q_ASSERT(_subJobs.at(i)->_state == Running);
Q_ASSERT(subJobsIterator.value()->_state == Running);
auto paral = _subJobs.at(i)->parallelism();
auto paral = subJobsIterator.value()->parallelism();
if (paral == WaitForFinished) {
return false;
}
@@ -666,14 +707,9 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
_runningNow--;
_jobsFinished++;
int totalJobs = _subJobs.count();
if (_firstJob) {
totalJobs++;
}
// We finished processing all the jobs
// check if we finished
if (_jobsFinished >= totalJobs) {
if (_jobsFinished >= _totalJobs) {
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
finalize();
} else {
+106 -11
Ver Arquivo
@@ -15,6 +15,7 @@
#ifndef OWNCLOUDPROPAGATOR_H
#define OWNCLOUDPROPAGATOR_H
#include <limits>
#include <QHash>
#include <QObject>
#include <QMap>
@@ -24,12 +25,12 @@
#include <QPointer>
#include <QIODevice>
#include <QMutex>
#include <QDebug>
#include "syncfileitem.h"
#include "syncjournaldb.h"
#include "bandwidthmanager.h"
#include "accountfwd.h"
namespace OCC {
/** Free disk space threshold below which syncs will abort and not even start.
@@ -60,7 +61,32 @@ protected:
OwncloudPropagator *_propagator;
public:
explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator), _state(NotYetStarted) {}
enum JobPriority {
/**
* Jobs are prioritized, so that they will be executed unconditionaly first,
* according to insertion order withing the items of the same priority
*/
FirstOutPriority,
/**
* Jobs are prioritized, so that they will be executed unconditionaly last,
* according to insertion order withing the items of the same priority
*/
LastOutPriority,
/** Jobs are normaly prioritized, so that they will be executed according to some evaluation attribute represented by integer*/
NormalPriority,
/** To contruct predicate for this Priority, all subitem has to be classified and contenerised */
ContainerItemsPriority,
};
explicit PropagatorJob(OwncloudPropagator* propagator, JobPriority priority) : _propagator(propagator), _priority(priority), _state(NotYetStarted) {}
/*
* Keeps track of the priority of the object
*/
JobPriority _priority;
enum JobState {
NotYetStarted,
@@ -100,6 +126,41 @@ public:
*/
virtual qint64 committedDiskSpace() const { return 0; }
/**
* Returns job priority predicate value according to LastOut or FirstOut priority
* or method can be overriden to return a specific priority attribute value
*
* FirstOut priority will return minimum value for quint64 since QMultiMap keys are sorted in increasing order
*
* This method returns 0 in case of error
*/
virtual quint64 getJobPriorityAttributeValue() const {
if (_priority==JobPriority::FirstOutPriority) {
return std::numeric_limits<quint64>::min();
} else if (_priority==JobPriority::LastOutPriority){
return std::numeric_limits<quint64>::max();
} else {
return 0;
}
}
/**
* Updates job priorities for the given job
*/
virtual void updateJobPriorityAttributeValues() {}
/**
* Enforces FirstOut job priority.
* NOTE: This has to be set before item is being added to _subJobs queue to be propagated!
*/
virtual void setFirstOutJobPriority() { _priority = JobPriority::FirstOutPriority; }
/**
* Enforces LastOut job priority
* NOTE: This has to be set before item is being added to _subJobs queue to be propagated!
*/
virtual void setLastOutJobPriority() { _priority = JobPriority::LastOutPriority; }
public slots:
virtual void abort() {}
@@ -158,14 +219,15 @@ private:
QScopedPointer<PropagateItemJob> _restoreJob;
public:
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItemPtr &item)
: PropagatorJob(propagator), _item(item) {}
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItemPtr &item, JobPriority priority)
: PropagatorJob(propagator, priority), _item(item) {}
bool scheduleNextJob() Q_DECL_OVERRIDE {
if (_state != NotYetStarted) {
return false;
}
_state = Running;
qDebug() << "Modification Time - Priority" << _item->_modtime << "Item" << _item->_file;
QMetaObject::invokeMethod(this, "start"); // We could be in a different thread (neon jobs)
return true;
}
@@ -187,27 +249,45 @@ public:
// e.g: create the directory
QScopedPointer<PropagateItemJob>_firstJob;
// all the sub files or sub directories.
QVector<PropagatorJob *> _subJobs;
/*
* All the sub files or sub directories. This map has to be updated with _containerJobs
* QMultiMap is ordered by the key value, or in case of equal keys (e.g 0) by insertion order.
* <quint64 predicate, PropagatorJob * job>, where predicate is obtained by call to getJobPredicateValue()
*/
QMultiMap<quint64, PropagatorJob *> _subJobs;
/*
* This vector is just temporary structure used to keep the "container" jobs like directory job.
* After the updateJobPredicateValues() is called on this directories, these container jobs will be inserted
* to _subJobs queue. Note: This has to be called after all items are added to the _subJobs for whole the sync!
*/
QVector<PropagatorJob *> _containerJobs;
SyncFileItemPtr _item;
int _jobsFinished; // number of jobs that have completed
int _totalJobs; // number of jobs that will be defined to be synced
int _runningNow; // number of subJobs running right now
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
int _firstUnfinishedSubJob;
explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
: PropagatorJob(propagator)
, _firstJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0)
: PropagatorJob(propagator, JobPriority::ContainerItemsPriority)
, _firstJob(0), _item(item), _jobsFinished(0), _totalJobs(0), _runningNow(0), _hasError(SyncFileItem::NoStatus)
{ }
virtual ~PropagateDirectory() {
qDeleteAll(_subJobs);
qDeleteAll(_containerJobs);
}
void append(PropagatorJob *subJob) {
_subJobs.append(subJob);
// we do not yet have all items in all the folders, so add it temporarly to _containerJobs and move to _subJobs
// directly before start of the sync (after updating priority attributes)
if(subJob->_priority == JobPriority::ContainerItemsPriority){
_containerJobs.append(subJob);
} else {
insertItemByPriority(subJob);
}
}
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
@@ -227,6 +307,21 @@ public:
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
// this item is prioritized normaly, so get priority by its sub items highest modification timestamp (most recent modification)
// because _subJobs are already sorted, take first job priority attribute
quint64 getJobPriorityAttributeValue() const Q_DECL_OVERRIDE {
if (_priority == JobPriority::ContainerItemsPriority && !_subJobs.empty())
return _subJobs.first()->getJobPriorityAttributeValue();
// subJobs empty or forced priority, return priority of folder itself
return PropagatorJob::getJobPriorityAttributeValue();
}
// this method should decide about prioritising single items during their insert
void insertItemByPriority(PropagatorJob *subJob);
// this method should decide about prioritising container items during their insert
void updateJobPriorityAttributeValues();
private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
@@ -252,7 +347,7 @@ class PropagateIgnoreJob : public PropagateItemJob {
Q_OBJECT
public:
PropagateIgnoreJob(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item) {}
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
void start() Q_DECL_OVERRIDE {
SyncFileItem::Status status = _item->_status;
done(status == SyncFileItem::NoStatus ? SyncFileItem::FileIgnored : status, _item->_errorString);
+4 -1
Ver Arquivo
@@ -110,13 +110,16 @@ class PropagateDownloadFile : public PropagateItemJob {
Q_OBJECT
public:
PropagateDownloadFile(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _resumeStart(0), _downloadProgress(0), _deleteExisting(false) {}
: PropagateItemJob(propagator, item, JobPriority::NormalPriority), _resumeStart(0), _downloadProgress(0), _deleteExisting(false) {}
void start() Q_DECL_OVERRIDE;
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
// We think it might finish quickly because it is a small file.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
// this item is prioritized normaly, so get priority by its modification time
quint64 getJobPriorityAttributeValue() const Q_DECL_OVERRIDE { return _item->_modtime; }
/**
* Whether an existing folder with the same name may be deleted before
* the download.
+1 -1
Ver Arquivo
@@ -48,7 +48,7 @@ class PropagateRemoteDelete : public PropagateItemJob {
QPointer<DeleteJob> _job;
public:
PropagateRemoteDelete (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item) {}
: PropagateItemJob(propagator, item, JobPriority::LastOutPriority) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
+1 -1
Ver Arquivo
@@ -29,7 +29,7 @@ class PropagateRemoteMkdir : public PropagateItemJob {
friend class PropagateDirectory; // So it can access the _item;
public:
PropagateRemoteMkdir (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _deleteExisting(false) {}
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority), _deleteExisting(false) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
+1 -1
Ver Arquivo
@@ -51,7 +51,7 @@ class PropagateRemoteMove : public PropagateItemJob {
QPointer<MoveJob> _job;
public:
PropagateRemoteMove (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item) {}
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinishedInParentDirectory; }
+4 -1
Ver Arquivo
@@ -197,7 +197,7 @@ protected:
public:
PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {}
: PropagateItemJob(propagator, item, JobPriority::NormalPriority), _finished(false), _deleteExisting(false) {}
/**
* Whether an existing entity with the same name may be deleted before
@@ -211,6 +211,9 @@ public:
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
// this item is prioritized normaly, so get priority by its modification time
quint64 getJobPriorityAttributeValue() const Q_DECL_OVERRIDE { return _item->_modtime; }
private slots:
void slotComputeContentChecksum();
// Content checksum computed, compute the transmission checksum
+5 -3
Ver Arquivo
@@ -40,7 +40,8 @@ static const char checkSumAdlerC[] = "Adler32";
class PropagateLocalRemove : public PropagateItemJob {
Q_OBJECT
public:
PropagateLocalRemove (OwncloudPropagator* propagator,const SyncFileItemPtr& item) : PropagateItemJob(propagator, item) {}
PropagateLocalRemove (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
void start() Q_DECL_OVERRIDE;
private:
bool removeRecursively(const QString &path);
@@ -55,7 +56,7 @@ class PropagateLocalMkdir : public PropagateItemJob {
Q_OBJECT
public:
PropagateLocalMkdir (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item), _deleteExistingFile(false) {}
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority), _deleteExistingFile(false) {}
void start() Q_DECL_OVERRIDE;
/**
@@ -77,7 +78,8 @@ private:
class PropagateLocalRename : public PropagateItemJob {
Q_OBJECT
public:
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItemPtr& item) : PropagateItemJob(propagator, item) {}
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
void start() Q_DECL_OVERRIDE;
JobParallelism parallelism() Q_DECL_OVERRIDE { return WaitForFinishedInParentDirectory; }
};