Comparar commits

...

1 Commits

Autor SHA1 Mensagem Data
Piotr M 5b0141cfed Separate sync specific jobs from data transfer jobs 2016-12-21 11:57:20 +01:00
2 arquivos alterados com 163 adições e 1 exclusões
+108
Ver Arquivo
@@ -308,11 +308,18 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
* In order to do that we loop over the items. (which are sorted by destination)
* When we enter a directory, we can create the directory job and push it on the stack. */
/// root job is initialized as PropagateDirectory
_rootJob.reset(new PropagateDirectory(this));
QStack<QPair<QString /* directory name */, PropagateDirectory* /* job */> > directories;
directories.push(qMakePair(QString(), _rootJob.data()));
QVector<PropagatorJob*> directoriesToRemove;
QString removedDirectory;
PropagateDataTransfers* dataTransfers = new PropagateDataTransfers(this);
/// directories will be used to create a sync logic
/// this will create/update logical directories structure
foreach(const SyncFileItemPtr &item, items) {
if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) {
@@ -389,7 +396,12 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
currentDirJob->append(dir);
}
directories.push(qMakePair(item->destination() + "/" , dir));
} else if (item->_instruction == CSYNC_INSTRUCTION_NEW
|| item->_instruction == CSYNC_INSTRUCTION_SYNC){
// for new/updated files add this to data transfers job
dataTransfers->append(createJob(item));
} else if (PropagateItemJob* current = createJob(item)) {
// for conflics, ignore, move, delete etc sync items, add the to directories logic
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
directoriesToRemove.prepend(current);
@@ -400,6 +412,11 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
}
}
/// dataTransfers will be used to create a data transfer logic
/// create/update files withing already created/updated directories structure
_rootJob->append(dataTransfers);
/// directories to remove have to go last, nice there could be moves
foreach(PropagatorJob* it, directoriesToRemove) {
_rootJob->append(it);
}
@@ -729,6 +746,97 @@ qint64 PropagateDirectory::committedDiskSpace() const
return needed;
}
// ================================================================================
PropagatorJob::JobParallelism PropagateDataTransfers::parallelism()
{
// Upload and Download jobs parallelism is FullParallelism
return FullParallelism;
}
bool PropagateDataTransfers::scheduleNextJob()
{
if (_state == Finished) {
return false;
}
if (_state == NotYetStarted) {
_state = Running;
// at the begining of the Directory Job, update expected number of Jobs to be synced
_totalJobs = _subJobs.count();
if (_subJobs.isEmpty()) {
finalize();
return true;
}
}
QMutableListIterator<PropagatorJob *> subJobsIterator(_subJobs);
while (subJobsIterator.hasNext()) {
subJobsIterator.next();
// get the state of the state of the sub job pointed by call next()
// peekPrevious() will directly access the item through hash in the QList at that subjob
if (subJobsIterator.peekPrevious()->_state == Finished) {
// if this items is finish, remove it from the _subJobs list as it is not needed anymore
subJobsIterator.remove();
continue;
}
if (possiblyRunNextJob(subJobsIterator.peekPrevious())) {
return true;
}
Q_ASSERT(subJobsIterator.peekPrevious()->_state == Running);
}
return false;
}
void PropagateDataTransfers::slotSubJobFinished(SyncFileItem::Status status)
{
if (status == SyncFileItem::FatalError) {
abort();
_state = Finished;
emit finished(status);
return;
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
_hasError = status;
}
_runningNow--;
_jobsFinished++;
// We finished processing all the jobs
// check if we finished
if (_jobsFinished >= _totalJobs) {
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
finalize();
} else {
emit ready();
}
}
void PropagateDataTransfers::finalize()
{
_state = Finished;
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
}
qint64 PropagateDataTransfers::committedDiskSpace() const
{
qint64 needed = 0;
foreach (PropagatorJob* job, _subJobs) {
needed += job->committedDiskSpace();
}
return needed;
}
// ================================================================================
CleanupPollsJob::~CleanupPollsJob()
{}
+55 -1
Ver Arquivo
@@ -178,7 +178,7 @@ public slots:
/**
* @brief Propagate a directory, and all its sub entries.
* @brief Propagate a directory, and all its sub entries which are not uploads/downloads.
* @ingroup libsync
*/
class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
@@ -243,6 +243,60 @@ private slots:
void slotSubJobFinished(SyncFileItem::Status status);
};
/**
* @brief Propagate new/updates of files to be uploaded/downloaded
* @ingroup libsync
*/
class OWNCLOUDSYNC_EXPORT PropagateDataTransfers : public PropagatorJob {
Q_OBJECT
public:
// all the file uploads and downloads
QList<PropagatorJob *> _subJobs;
int _jobsFinished; // number of jobs that have completed
int _runningNow; // number of subJobs running right now
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
int _totalJobs;
explicit PropagateDataTransfers(OwncloudPropagator *propagator)
: PropagatorJob(propagator)
, _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0)
{ }
virtual ~PropagateDataTransfers() {
qDeleteAll(_subJobs);
}
void append(PropagatorJob *subJob) {
_subJobs.append(subJob);
}
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
virtual void abort() Q_DECL_OVERRIDE {
foreach (PropagatorJob *j, _subJobs)
j->abort();
}
void finalize();
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
connect(next, SIGNAL(ready()), this, SIGNAL(ready()));
_runningNow++;
}
return next->scheduleNextJob();
}
void slotSubJobFinished(SyncFileItem::Status status);
};
/**
* @brief Dummy job that just mark it as completed and ignored