Comparar commits
2 Commits
2.3.0-beta1
...
scheduler
| Autor | SHA1 | Data | |
|---|---|---|---|
| 6a8e718d4a | |||
| 353cd16303 |
@@ -401,9 +401,16 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
}
|
||||
|
||||
foreach(PropagatorJob* it, directoriesToRemove) {
|
||||
// ensure that these items will go last in the root folder and append
|
||||
it->setLastOutJobPriority();
|
||||
_rootJob->append(it);
|
||||
}
|
||||
|
||||
// at this point, all priority values for items in the folders should be set
|
||||
// update priority attribute for folders containing these items
|
||||
// this will move _containerJobs to _subJobs updating their evaluation attributes
|
||||
_rootJob->updateJobPriorityAttributeValues();
|
||||
|
||||
connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
|
||||
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
|
||||
connect(_rootJob.data(), SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
|
||||
@@ -574,20 +581,52 @@ OwncloudPropagator::DiskSpaceResult OwncloudPropagator::diskSpaceCheck() const
|
||||
|
||||
// ================================================================================
|
||||
|
||||
void PropagateDirectory::insertItemByPriority(PropagatorJob *subJob){
|
||||
// if job is prioritised, take priority into account, otherwise use modification time
|
||||
int priority = subJob->getJobPriorityAttributeValue();
|
||||
|
||||
if(subJob->_priority == JobPriority::NormalPriority){
|
||||
// jobs are prioritised by timestamp, thus higher timestamp, higher the priority
|
||||
// QMultiMap is sorted in increasing order, so substract from max possible quint64
|
||||
int modificationTimestamp = subJob->getJobPriorityAttributeValue();
|
||||
priority = std::numeric_limits<quint64>::max() - modificationTimestamp;
|
||||
}
|
||||
|
||||
_subJobs.insertMulti(priority, subJob);
|
||||
}
|
||||
|
||||
void PropagateDirectory::updateJobPriorityAttributeValues()
|
||||
{
|
||||
// This uses recursion to perform Depth-First Traversal of the directories with changes trees
|
||||
// If the given (this) directory contains _containerJobs, it will call updateJob on that child dir job, otherwise does nothing
|
||||
|
||||
//this will get the _container jobs within this parent directory and iterate over them
|
||||
QMutableVectorIterator<PropagatorJob *> containerJobsIterator(_containerJobs);
|
||||
while (containerJobsIterator.hasNext()) {
|
||||
//take next Container Job
|
||||
PropagatorJob * subJob = containerJobsIterator.next();
|
||||
|
||||
//ascend to its child folders to update their priorities
|
||||
subJob->updateJobPriorityAttributeValues();
|
||||
|
||||
//at this point, all child folders are updated, add job to _subJobs and remote job from _containerJobs
|
||||
insertItemByPriority(subJob);
|
||||
containerJobsIterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
PropagatorJob::JobParallelism PropagateDirectory::parallelism()
|
||||
{
|
||||
// If any of the non-finished sub jobs is not parallel, we have to wait
|
||||
|
||||
// FIXME! we should probably cache this result
|
||||
|
||||
if (_firstJob && _firstJob->_state != Finished) {
|
||||
if (_firstJob->parallelism() != FullParallelism)
|
||||
return WaitForFinished;
|
||||
}
|
||||
|
||||
// FIXME: use the cached value of finished job
|
||||
for (int i = 0; i < _subJobs.count(); ++i) {
|
||||
if (_subJobs.at(i)->_state != Finished && _subJobs.at(i)->parallelism() != FullParallelism) {
|
||||
QMapIterator<quint64, PropagatorJob *> subJobsIterator(_subJobs);
|
||||
while (subJobsIterator.hasNext()) {
|
||||
subJobsIterator.next();
|
||||
if (subJobsIterator.value()->_state != Finished && subJobsIterator.value()->parallelism() != FullParallelism) {
|
||||
return WaitForFinished;
|
||||
}
|
||||
}
|
||||
@@ -604,6 +643,11 @@ bool PropagateDirectory::scheduleNextJob()
|
||||
if (_state == NotYetStarted) {
|
||||
_state = Running;
|
||||
|
||||
// at the begining of the Directory Job, update expected number of Jobs to be synced
|
||||
_totalJobs = _subJobs.count();
|
||||
if (_firstJob)
|
||||
_totalJobs++;
|
||||
|
||||
if (!_firstJob && _subJobs.isEmpty()) {
|
||||
finalize();
|
||||
return true;
|
||||
@@ -618,30 +662,27 @@ bool PropagateDirectory::scheduleNextJob()
|
||||
return false;
|
||||
}
|
||||
|
||||
// cache the value of first unfinished subjob
|
||||
bool stopAtDirectory = false;
|
||||
int i = _firstUnfinishedSubJob;
|
||||
int subJobsCount = _subJobs.count();
|
||||
while (i < subJobsCount && _subJobs.at(i)->_state == Finished) {
|
||||
_firstUnfinishedSubJob = ++i;
|
||||
}
|
||||
|
||||
for (int i = _firstUnfinishedSubJob; i < subJobsCount; ++i) {
|
||||
if (_subJobs.at(i)->_state == Finished) {
|
||||
QMutableMapIterator<quint64, PropagatorJob *> subJobsIterator(_subJobs);
|
||||
while (subJobsIterator.hasNext()) {
|
||||
subJobsIterator.next();
|
||||
if (subJobsIterator.value()->_state == Finished) {
|
||||
subJobsIterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (stopAtDirectory && qobject_cast<PropagateDirectory*>(_subJobs.at(i))) {
|
||||
if (stopAtDirectory && qobject_cast<PropagateDirectory*>(subJobsIterator.value())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (possiblyRunNextJob(_subJobs.at(i))) {
|
||||
if (possiblyRunNextJob(subJobsIterator.value())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Q_ASSERT(_subJobs.at(i)->_state == Running);
|
||||
Q_ASSERT(subJobsIterator.value()->_state == Running);
|
||||
|
||||
auto paral = _subJobs.at(i)->parallelism();
|
||||
auto paral = subJobsIterator.value()->parallelism();
|
||||
if (paral == WaitForFinished) {
|
||||
return false;
|
||||
}
|
||||
@@ -666,14 +707,9 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
|
||||
_runningNow--;
|
||||
_jobsFinished++;
|
||||
|
||||
int totalJobs = _subJobs.count();
|
||||
if (_firstJob) {
|
||||
totalJobs++;
|
||||
}
|
||||
|
||||
// We finished processing all the jobs
|
||||
// check if we finished
|
||||
if (_jobsFinished >= totalJobs) {
|
||||
if (_jobsFinished >= _totalJobs) {
|
||||
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
|
||||
finalize();
|
||||
} else {
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#ifndef OWNCLOUDPROPAGATOR_H
|
||||
#define OWNCLOUDPROPAGATOR_H
|
||||
|
||||
#include <limits>
|
||||
#include <QHash>
|
||||
#include <QObject>
|
||||
#include <QMap>
|
||||
@@ -24,12 +25,12 @@
|
||||
#include <QPointer>
|
||||
#include <QIODevice>
|
||||
#include <QMutex>
|
||||
#include <QDebug>
|
||||
|
||||
#include "syncfileitem.h"
|
||||
#include "syncjournaldb.h"
|
||||
#include "bandwidthmanager.h"
|
||||
#include "accountfwd.h"
|
||||
|
||||
namespace OCC {
|
||||
|
||||
/** Free disk space threshold below which syncs will abort and not even start.
|
||||
@@ -60,7 +61,32 @@ protected:
|
||||
OwncloudPropagator *_propagator;
|
||||
|
||||
public:
|
||||
explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator), _state(NotYetStarted) {}
|
||||
enum JobPriority {
|
||||
/**
|
||||
* Jobs are prioritized, so that they will be executed unconditionaly first,
|
||||
* according to insertion order withing the items of the same priority
|
||||
*/
|
||||
FirstOutPriority,
|
||||
|
||||
/**
|
||||
* Jobs are prioritized, so that they will be executed unconditionaly last,
|
||||
* according to insertion order withing the items of the same priority
|
||||
*/
|
||||
LastOutPriority,
|
||||
|
||||
/** Jobs are normaly prioritized, so that they will be executed according to some evaluation attribute represented by integer*/
|
||||
NormalPriority,
|
||||
|
||||
/** To contruct predicate for this Priority, all subitem has to be classified and contenerised */
|
||||
ContainerItemsPriority,
|
||||
};
|
||||
|
||||
explicit PropagatorJob(OwncloudPropagator* propagator, JobPriority priority) : _propagator(propagator), _priority(priority), _state(NotYetStarted) {}
|
||||
|
||||
/*
|
||||
* Keeps track of the priority of the object
|
||||
*/
|
||||
JobPriority _priority;
|
||||
|
||||
enum JobState {
|
||||
NotYetStarted,
|
||||
@@ -100,6 +126,41 @@ public:
|
||||
*/
|
||||
virtual qint64 committedDiskSpace() const { return 0; }
|
||||
|
||||
/**
|
||||
* Returns job priority predicate value according to LastOut or FirstOut priority
|
||||
* or method can be overriden to return a specific priority attribute value
|
||||
*
|
||||
* FirstOut priority will return minimum value for quint64 since QMultiMap keys are sorted in increasing order
|
||||
*
|
||||
* This method returns 0 in case of error
|
||||
*/
|
||||
virtual quint64 getJobPriorityAttributeValue() const {
|
||||
if (_priority==JobPriority::FirstOutPriority) {
|
||||
return std::numeric_limits<quint64>::min();
|
||||
} else if (_priority==JobPriority::LastOutPriority){
|
||||
return std::numeric_limits<quint64>::max();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates job priorities for the given job
|
||||
*/
|
||||
virtual void updateJobPriorityAttributeValues() {}
|
||||
|
||||
/**
|
||||
* Enforces FirstOut job priority.
|
||||
* NOTE: This has to be set before item is being added to _subJobs queue to be propagated!
|
||||
*/
|
||||
virtual void setFirstOutJobPriority() { _priority = JobPriority::FirstOutPriority; }
|
||||
|
||||
/**
|
||||
* Enforces LastOut job priority
|
||||
* NOTE: This has to be set before item is being added to _subJobs queue to be propagated!
|
||||
*/
|
||||
virtual void setLastOutJobPriority() { _priority = JobPriority::LastOutPriority; }
|
||||
|
||||
public slots:
|
||||
virtual void abort() {}
|
||||
|
||||
@@ -158,14 +219,15 @@ private:
|
||||
QScopedPointer<PropagateItemJob> _restoreJob;
|
||||
|
||||
public:
|
||||
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItemPtr &item)
|
||||
: PropagatorJob(propagator), _item(item) {}
|
||||
PropagateItemJob(OwncloudPropagator* propagator, const SyncFileItemPtr &item, JobPriority priority)
|
||||
: PropagatorJob(propagator, priority), _item(item) {}
|
||||
|
||||
bool scheduleNextJob() Q_DECL_OVERRIDE {
|
||||
if (_state != NotYetStarted) {
|
||||
return false;
|
||||
}
|
||||
_state = Running;
|
||||
qDebug() << "Modification Time - Priority" << _item->_modtime << "Item" << _item->_file;
|
||||
QMetaObject::invokeMethod(this, "start"); // We could be in a different thread (neon jobs)
|
||||
return true;
|
||||
}
|
||||
@@ -187,27 +249,45 @@ public:
|
||||
// e.g: create the directory
|
||||
QScopedPointer<PropagateItemJob>_firstJob;
|
||||
|
||||
// all the sub files or sub directories.
|
||||
QVector<PropagatorJob *> _subJobs;
|
||||
/*
|
||||
* All the sub files or sub directories. This map has to be updated with _containerJobs
|
||||
* QMultiMap is ordered by the key value, or in case of equal keys (e.g 0) by insertion order.
|
||||
* <quint64 predicate, PropagatorJob * job>, where predicate is obtained by call to getJobPredicateValue()
|
||||
*/
|
||||
QMultiMap<quint64, PropagatorJob *> _subJobs;
|
||||
|
||||
/*
|
||||
* This vector is just temporary structure used to keep the "container" jobs like directory job.
|
||||
* After the updateJobPredicateValues() is called on this directories, these container jobs will be inserted
|
||||
* to _subJobs queue. Note: This has to be called after all items are added to the _subJobs for whole the sync!
|
||||
*/
|
||||
QVector<PropagatorJob *> _containerJobs;
|
||||
|
||||
SyncFileItemPtr _item;
|
||||
|
||||
int _jobsFinished; // number of jobs that have completed
|
||||
int _totalJobs; // number of jobs that will be defined to be synced
|
||||
int _runningNow; // number of subJobs running right now
|
||||
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
|
||||
int _firstUnfinishedSubJob;
|
||||
|
||||
explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
|
||||
: PropagatorJob(propagator)
|
||||
, _firstJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0)
|
||||
: PropagatorJob(propagator, JobPriority::ContainerItemsPriority)
|
||||
, _firstJob(0), _item(item), _jobsFinished(0), _totalJobs(0), _runningNow(0), _hasError(SyncFileItem::NoStatus)
|
||||
{ }
|
||||
|
||||
virtual ~PropagateDirectory() {
|
||||
qDeleteAll(_subJobs);
|
||||
qDeleteAll(_containerJobs);
|
||||
}
|
||||
|
||||
void append(PropagatorJob *subJob) {
|
||||
_subJobs.append(subJob);
|
||||
// we do not yet have all items in all the folders, so add it temporarly to _containerJobs and move to _subJobs
|
||||
// directly before start of the sync (after updating priority attributes)
|
||||
if(subJob->_priority == JobPriority::ContainerItemsPriority){
|
||||
_containerJobs.append(subJob);
|
||||
} else {
|
||||
insertItemByPriority(subJob);
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
|
||||
@@ -227,6 +307,21 @@ public:
|
||||
|
||||
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
|
||||
|
||||
// this item is prioritized normaly, so get priority by its sub items highest modification timestamp (most recent modification)
|
||||
// because _subJobs are already sorted, take first job priority attribute
|
||||
quint64 getJobPriorityAttributeValue() const Q_DECL_OVERRIDE {
|
||||
if (_priority == JobPriority::ContainerItemsPriority && !_subJobs.empty())
|
||||
return _subJobs.first()->getJobPriorityAttributeValue();
|
||||
|
||||
// subJobs empty or forced priority, return priority of folder itself
|
||||
return PropagatorJob::getJobPriorityAttributeValue();
|
||||
}
|
||||
|
||||
// this method should decide about prioritising single items during their insert
|
||||
void insertItemByPriority(PropagatorJob *subJob);
|
||||
|
||||
// this method should decide about prioritising container items during their insert
|
||||
void updateJobPriorityAttributeValues();
|
||||
private slots:
|
||||
bool possiblyRunNextJob(PropagatorJob *next) {
|
||||
if (next->_state == NotYetStarted) {
|
||||
@@ -252,7 +347,7 @@ class PropagateIgnoreJob : public PropagateItemJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
PropagateIgnoreJob(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
|
||||
void start() Q_DECL_OVERRIDE {
|
||||
SyncFileItem::Status status = _item->_status;
|
||||
done(status == SyncFileItem::NoStatus ? SyncFileItem::FileIgnored : status, _item->_errorString);
|
||||
|
||||
@@ -110,13 +110,16 @@ class PropagateDownloadFile : public PropagateItemJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
PropagateDownloadFile(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item), _resumeStart(0), _downloadProgress(0), _deleteExisting(false) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::NormalPriority), _resumeStart(0), _downloadProgress(0), _deleteExisting(false) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
|
||||
|
||||
// We think it might finish quickly because it is a small file.
|
||||
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
|
||||
|
||||
// this item is prioritized normaly, so get priority by its modification time
|
||||
quint64 getJobPriorityAttributeValue() const Q_DECL_OVERRIDE { return _item->_modtime; }
|
||||
|
||||
/**
|
||||
* Whether an existing folder with the same name may be deleted before
|
||||
* the download.
|
||||
|
||||
@@ -48,7 +48,7 @@ class PropagateRemoteDelete : public PropagateItemJob {
|
||||
QPointer<DeleteJob> _job;
|
||||
public:
|
||||
PropagateRemoteDelete (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::LastOutPriority) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
void abort() Q_DECL_OVERRIDE;
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ class PropagateRemoteMkdir : public PropagateItemJob {
|
||||
friend class PropagateDirectory; // So it can access the _item;
|
||||
public:
|
||||
PropagateRemoteMkdir (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item), _deleteExisting(false) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority), _deleteExisting(false) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
void abort() Q_DECL_OVERRIDE;
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ class PropagateRemoteMove : public PropagateItemJob {
|
||||
QPointer<MoveJob> _job;
|
||||
public:
|
||||
PropagateRemoteMove (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
void abort() Q_DECL_OVERRIDE;
|
||||
JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinishedInParentDirectory; }
|
||||
|
||||
@@ -197,7 +197,7 @@ protected:
|
||||
|
||||
public:
|
||||
PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::NormalPriority), _finished(false), _deleteExisting(false) {}
|
||||
|
||||
/**
|
||||
* Whether an existing entity with the same name may be deleted before
|
||||
@@ -211,6 +211,9 @@ public:
|
||||
|
||||
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
|
||||
|
||||
// this item is prioritized normaly, so get priority by its modification time
|
||||
quint64 getJobPriorityAttributeValue() const Q_DECL_OVERRIDE { return _item->_modtime; }
|
||||
|
||||
private slots:
|
||||
void slotComputeContentChecksum();
|
||||
// Content checksum computed, compute the transmission checksum
|
||||
|
||||
@@ -40,7 +40,8 @@ static const char checkSumAdlerC[] = "Adler32";
|
||||
class PropagateLocalRemove : public PropagateItemJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
PropagateLocalRemove (OwncloudPropagator* propagator,const SyncFileItemPtr& item) : PropagateItemJob(propagator, item) {}
|
||||
PropagateLocalRemove (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
private:
|
||||
bool removeRecursively(const QString &path);
|
||||
@@ -55,7 +56,7 @@ class PropagateLocalMkdir : public PropagateItemJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
PropagateLocalMkdir (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item), _deleteExistingFile(false) {}
|
||||
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority), _deleteExistingFile(false) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
|
||||
/**
|
||||
@@ -77,7 +78,8 @@ private:
|
||||
class PropagateLocalRename : public PropagateItemJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItemPtr& item) : PropagateItemJob(propagator, item) {}
|
||||
PropagateLocalRename (OwncloudPropagator* propagator,const SyncFileItemPtr& item)
|
||||
: PropagateItemJob(propagator, item, JobPriority::FirstOutPriority) {}
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
JobParallelism parallelism() Q_DECL_OVERRIDE { return WaitForFinishedInParentDirectory; }
|
||||
};
|
||||
|
||||
Referência em uma Nova Issue
Bloquear um usuário