Comparar commits

...

1 Commits

Autor SHA1 Mensagem Data
Piotr M 5aa1629440 remove excesive loops and add sync files scheduler 2016-10-25 00:23:50 +02:00
2 arquivos alterados com 169 adições e 5 exclusões
+93 -2
Ver Arquivo
@@ -384,6 +384,8 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
// will delete directories, so defer execution
directoriesToRemove.prepend(current);
removedDirectory = item->_file + "/";
} else if (item->_instruction == CSYNC_INSTRUCTION_NEW || item->_instruction == CSYNC_INSTRUCTION_SYNC){
directories.top().second->appendSyncJob(current);
} else {
directories.top().second->append(current);
}
@@ -593,7 +595,11 @@ bool PropagateDirectory::scheduleNextJob()
_state = Running;
if (!_firstJob && _subJobs.isEmpty()) {
finalize();
if (!_scheduledSyncJobs && _syncJobsScheduler) {
scheduleSyncJobs();
} else {
finalize();
}
return true;
}
}
@@ -657,12 +663,23 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
// check if we finished
if (_jobsFinished >= totalJobs) {
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
finalize();
if (!_scheduledSyncJobs && _syncJobsScheduler) {
scheduleSyncJobs();
} else {
finalize();
}
} else {
emit ready();
}
}
void PropagateDirectory::scheduleSyncJobs() {
_scheduledSyncJobs = true;
//at this point, scheduler will be destroyed together with _subJobs
_subJobs.append(_syncJobsScheduler.data());
emit ready();
}
void PropagateDirectory::finalize()
{
bool ok = true;
@@ -711,6 +728,80 @@ qint64 PropagateDirectory::committedDiskSpace() const
return needed;
}
// ================================================================================
PropagatorJob::JobParallelism PropagateSyncItems::parallelism()
{
return FullParallelism;
}
bool PropagateSyncItems::scheduleNextJob()
{
if (_state == Finished) {
return false;
}
if (_state == NotYetStarted) {
_state = Running;
if (_syncJobs.isEmpty()) {
finalize();
return true;
}
}
while (!_syncJobs.isEmpty()) {
PropagatorJob *next = _syncJobs.takeFirst();
if (next->_state == Finished) {
continue;
}
if (possiblyRunNextJob(next)) {
return true;
}
Q_ASSERT(next->_state == Running);
}
return false;
}
void PropagateSyncItems::slotSubJobFinished(SyncFileItem::Status status)
{
if (status == SyncFileItem::FatalError) {
abort();
_hasError = status;
finalize();
return;
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
_hasError = status;
}
_jobsFinished++;
// We finished processing all the jobs
// check if we finished
if (_syncJobs.isEmpty() && (_syncJobsCount == _jobsFinished)) {
finalize();
} else {
emit ready();
}
}
void PropagateSyncItems::finalize()
{
_state = Finished;
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
}
qint64 PropagateSyncItems::committedDiskSpace() const
{
qint64 needed = 0;
foreach (PropagatorJob* job, _syncJobs) {
needed += job->committedDiskSpace();
}
return needed;
}
CleanupPollsJob::~CleanupPollsJob()
{}
+76 -3
Ver Arquivo
@@ -176,6 +176,59 @@ public slots:
virtual void start() = 0;
};
/**
* @brief Propagate sync items within a specific folder, and all its sub entries.
* @ingroup libsync
*/
class PropagateSyncItems : public PropagatorJob {
Q_OBJECT
public:
// all the sub files or sub directories.
QList<PropagatorJob *> _syncJobs;
int _jobsFinished; // number of jobs that have completed
int _syncJobsCount; // number of subJobs running right now
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
PropagateSyncItems(OwncloudPropagator *propagator)
: PropagatorJob(propagator)
, _jobsFinished(0), _syncJobsCount(0), _hasError(SyncFileItem::NoStatus)
{ }
virtual ~PropagateSyncItems() {
qDeleteAll(_syncJobs);
}
void append(PropagatorJob *subJob) {
_syncJobs.append(subJob);
_syncJobsCount++;
}
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
virtual void abort() Q_DECL_OVERRIDE {
foreach (PropagatorJob *j, _syncJobs)
j->abort();
}
void finalize();
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
connect(next, SIGNAL(ready()), this, SIGNAL(ready()));
}
return next->scheduleNextJob();
}
void slotSubJobFinished(SyncFileItem::Status status);
};
/**
* @brief Propagate a directory, and all its sub entries.
@@ -187,7 +240,12 @@ public:
// e.g: create the directory
QScopedPointer<PropagateItemJob>_firstJob;
// all the sub files or sub directories.
// all the new and changed files without conflicts scheduler class
// remark: do not QScopedPointer, since this class is either deleted via qDeleteAll(_subJobs) or usual delete,
// depending on ownership determined by flag _scheduledSyncJobs
QPointer<PropagateSyncItems> _syncJobsScheduler;
// all the other file operation or sub directories.
QVector<PropagatorJob *> _subJobs;
SyncFileItemPtr _item;
@@ -195,13 +253,18 @@ public:
int _jobsFinished; // number of jobs that have completed
int _runningNow; // number of subJobs running right now
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
bool _scheduledSyncJobs; // verify if already scheduled execution of files sync jobs
explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
: PropagatorJob(propagator)
, _firstJob(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus)
, _firstJob(0), _syncJobsScheduler(0), _item(item), _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _scheduledSyncJobs(false)
{ }
virtual ~PropagateDirectory() {
// check whether the owner of the pointer is _subJobs or PropagateDirectory class
if (!_scheduledSyncJobs){
delete _syncJobsScheduler;
}
qDeleteAll(_subJobs);
}
@@ -209,11 +272,20 @@ public:
_subJobs.append(subJob);
}
void appendSyncJob(PropagatorJob *subJob) {
if (!_syncJobsScheduler) {
_syncJobsScheduler = new PropagateSyncItems(_propagator);
}
_syncJobsScheduler->append(subJob);
}
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
virtual void abort() Q_DECL_OVERRIDE {
if (_firstJob)
_firstJob->abort();
if (_syncJobsScheduler)
_syncJobsScheduler->abort();
foreach (PropagatorJob *j, _subJobs)
j->abort();
}
@@ -224,6 +296,8 @@ public:
void finalize();
void scheduleSyncJobs();
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
private slots:
@@ -242,7 +316,6 @@ private slots:
void slotSubJobFinished(SyncFileItem::Status status);
};
/**
* @brief Dummy job that just mark it as completed and ignored
* @ingroup libsync