Comparar commits
1 Commits
| Autor | SHA1 | Data | |
|---|---|---|---|
| 5b0141cfed |
@@ -308,11 +308,18 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
* In order to do that we loop over the items. (which are sorted by destination)
|
||||
* When we enter a directory, we can create the directory job and push it on the stack. */
|
||||
|
||||
/// root job is initialized as PropagateDirectory
|
||||
_rootJob.reset(new PropagateDirectory(this));
|
||||
|
||||
QStack<QPair<QString /* directory name */, PropagateDirectory* /* job */> > directories;
|
||||
directories.push(qMakePair(QString(), _rootJob.data()));
|
||||
QVector<PropagatorJob*> directoriesToRemove;
|
||||
QString removedDirectory;
|
||||
|
||||
PropagateDataTransfers* dataTransfers = new PropagateDataTransfers(this);
|
||||
|
||||
/// directories will be used to create a sync logic
|
||||
/// this will create/update logical directories structure
|
||||
foreach(const SyncFileItemPtr &item, items) {
|
||||
|
||||
if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) {
|
||||
@@ -389,7 +396,12 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
currentDirJob->append(dir);
|
||||
}
|
||||
directories.push(qMakePair(item->destination() + "/" , dir));
|
||||
} else if (item->_instruction == CSYNC_INSTRUCTION_NEW
|
||||
|| item->_instruction == CSYNC_INSTRUCTION_SYNC){
|
||||
// for new/updated files add this to data transfers job
|
||||
dataTransfers->append(createJob(item));
|
||||
} else if (PropagateItemJob* current = createJob(item)) {
|
||||
// for conflics, ignore, move, delete etc sync items, add the to directories logic
|
||||
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
|
||||
// will delete directories, so defer execution
|
||||
directoriesToRemove.prepend(current);
|
||||
@@ -400,6 +412,11 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
}
|
||||
}
|
||||
|
||||
/// dataTransfers will be used to create a data transfer logic
|
||||
/// create/update files withing already created/updated directories structure
|
||||
_rootJob->append(dataTransfers);
|
||||
|
||||
/// directories to remove have to go last, nice there could be moves
|
||||
foreach(PropagatorJob* it, directoriesToRemove) {
|
||||
_rootJob->append(it);
|
||||
}
|
||||
@@ -729,6 +746,97 @@ qint64 PropagateDirectory::committedDiskSpace() const
|
||||
return needed;
|
||||
}
|
||||
|
||||
|
||||
// ================================================================================
|
||||
|
||||
PropagatorJob::JobParallelism PropagateDataTransfers::parallelism()
|
||||
{
|
||||
// Upload and Download jobs parallelism is FullParallelism
|
||||
|
||||
return FullParallelism;
|
||||
}
|
||||
|
||||
|
||||
bool PropagateDataTransfers::scheduleNextJob()
|
||||
{
|
||||
if (_state == Finished) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_state == NotYetStarted) {
|
||||
_state = Running;
|
||||
|
||||
// at the begining of the Directory Job, update expected number of Jobs to be synced
|
||||
_totalJobs = _subJobs.count();
|
||||
|
||||
if (_subJobs.isEmpty()) {
|
||||
finalize();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
QMutableListIterator<PropagatorJob *> subJobsIterator(_subJobs);
|
||||
|
||||
while (subJobsIterator.hasNext()) {
|
||||
subJobsIterator.next();
|
||||
// get the state of the state of the sub job pointed by call next()
|
||||
// peekPrevious() will directly access the item through hash in the QList at that subjob
|
||||
if (subJobsIterator.peekPrevious()->_state == Finished) {
|
||||
// if this items is finish, remove it from the _subJobs list as it is not needed anymore
|
||||
subJobsIterator.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (possiblyRunNextJob(subJobsIterator.peekPrevious())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Q_ASSERT(subJobsIterator.peekPrevious()->_state == Running);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void PropagateDataTransfers::slotSubJobFinished(SyncFileItem::Status status)
|
||||
{
|
||||
if (status == SyncFileItem::FatalError) {
|
||||
abort();
|
||||
_state = Finished;
|
||||
emit finished(status);
|
||||
return;
|
||||
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
|
||||
_hasError = status;
|
||||
}
|
||||
_runningNow--;
|
||||
_jobsFinished++;
|
||||
|
||||
// We finished processing all the jobs
|
||||
// check if we finished
|
||||
if (_jobsFinished >= _totalJobs) {
|
||||
Q_ASSERT(!_runningNow); // how can we be finished if there are still jobs running now
|
||||
finalize();
|
||||
} else {
|
||||
emit ready();
|
||||
}
|
||||
}
|
||||
|
||||
void PropagateDataTransfers::finalize()
|
||||
{
|
||||
_state = Finished;
|
||||
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
|
||||
}
|
||||
|
||||
qint64 PropagateDataTransfers::committedDiskSpace() const
|
||||
{
|
||||
qint64 needed = 0;
|
||||
foreach (PropagatorJob* job, _subJobs) {
|
||||
needed += job->committedDiskSpace();
|
||||
}
|
||||
return needed;
|
||||
}
|
||||
|
||||
|
||||
// ================================================================================
|
||||
|
||||
CleanupPollsJob::~CleanupPollsJob()
|
||||
{}
|
||||
|
||||
|
||||
@@ -178,7 +178,7 @@ public slots:
|
||||
|
||||
|
||||
/**
|
||||
* @brief Propagate a directory, and all its sub entries.
|
||||
* @brief Propagate a directory, and all its sub entries which are not uploads/downloads.
|
||||
* @ingroup libsync
|
||||
*/
|
||||
class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
|
||||
@@ -243,6 +243,60 @@ private slots:
|
||||
void slotSubJobFinished(SyncFileItem::Status status);
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Propagate new/updates of files to be uploaded/downloaded
|
||||
* @ingroup libsync
|
||||
*/
|
||||
class OWNCLOUDSYNC_EXPORT PropagateDataTransfers : public PropagatorJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
// all the file uploads and downloads
|
||||
QList<PropagatorJob *> _subJobs;
|
||||
|
||||
int _jobsFinished; // number of jobs that have completed
|
||||
int _runningNow; // number of subJobs running right now
|
||||
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
|
||||
int _totalJobs;
|
||||
|
||||
explicit PropagateDataTransfers(OwncloudPropagator *propagator)
|
||||
: PropagatorJob(propagator)
|
||||
, _jobsFinished(0), _runningNow(0), _hasError(SyncFileItem::NoStatus), _totalJobs(0)
|
||||
{ }
|
||||
|
||||
virtual ~PropagateDataTransfers() {
|
||||
qDeleteAll(_subJobs);
|
||||
}
|
||||
|
||||
void append(PropagatorJob *subJob) {
|
||||
_subJobs.append(subJob);
|
||||
}
|
||||
|
||||
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
|
||||
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
|
||||
virtual void abort() Q_DECL_OVERRIDE {
|
||||
foreach (PropagatorJob *j, _subJobs)
|
||||
j->abort();
|
||||
}
|
||||
|
||||
void finalize();
|
||||
|
||||
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
|
||||
|
||||
private slots:
|
||||
bool possiblyRunNextJob(PropagatorJob *next) {
|
||||
if (next->_state == NotYetStarted) {
|
||||
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
|
||||
connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
|
||||
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
|
||||
connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
|
||||
connect(next, SIGNAL(ready()), this, SIGNAL(ready()));
|
||||
_runningNow++;
|
||||
}
|
||||
return next->scheduleNextJob();
|
||||
}
|
||||
|
||||
void slotSubJobFinished(SyncFileItem::Status status);
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Dummy job that just mark it as completed and ignored
|
||||
|
||||
Referência em uma Nova Issue
Bloquear um usuário