Comparar commits
1 Commits
| Autor | SHA1 | Data | |
|---|---|---|---|
| b6af59d8a7 |
@@ -57,6 +57,7 @@ set(libsync_SRCS
|
||||
propagateremotedelete.cpp
|
||||
propagateremotemove.cpp
|
||||
propagateremotemkdir.cpp
|
||||
propagatefiles.cpp
|
||||
syncengine.cpp
|
||||
syncfilestatus.cpp
|
||||
syncfilestatustracker.cpp
|
||||
|
||||
@@ -116,6 +116,15 @@ bool Capabilities::chunkingNg() const
|
||||
return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0";
|
||||
}
|
||||
|
||||
bool Capabilities::scheduling() const
|
||||
{
|
||||
static const auto scheduling = qgetenv("OWNCLOUD_SCHEDULING");
|
||||
if (scheduling == "0") return false;
|
||||
if (scheduling == "1") return true;
|
||||
|
||||
return _capabilities["dav"].toMap()["scheduling"].toByteArray() >= "1.0";
|
||||
}
|
||||
|
||||
bool Capabilities::chunkingParallelUploadDisabled() const
|
||||
{
|
||||
return _capabilities["dav"].toMap()["chunkingParallelUploadDisabled"].toBool();
|
||||
|
||||
@@ -41,6 +41,7 @@ public:
|
||||
int sharePublicLinkExpireDateDays() const;
|
||||
bool shareResharing() const;
|
||||
bool chunkingNg() const;
|
||||
bool scheduling() const;
|
||||
|
||||
/// disable parallel upload in chunking
|
||||
bool chunkingParallelUploadDisabled() const;
|
||||
|
||||
@@ -52,6 +52,7 @@ static const char updateCheckIntervalC[] = "updateCheckInterval";
|
||||
static const char geometryC[] = "geometry";
|
||||
static const char timeoutC[] = "timeout";
|
||||
static const char chunkSizeC[] = "chunkSize";
|
||||
static const char smallFileSizeC[] = "smallFileSize";
|
||||
|
||||
static const char proxyHostC[] = "Proxy/host";
|
||||
static const char proxyTypeC[] = "Proxy/type";
|
||||
@@ -128,6 +129,12 @@ quint64 ConfigFile::chunkSize() const
|
||||
return settings.value(QLatin1String(chunkSizeC), 10*1000*1000).toLongLong(); // default to 10 MB
|
||||
}
|
||||
|
||||
quint64 ConfigFile::smallFileSize() const
|
||||
{
|
||||
QSettings settings(configFile(), QSettings::IniFormat);
|
||||
return settings.value(QLatin1String(smallFileSizeC), 500*1000).toLongLong(); // default to 500 kB
|
||||
}
|
||||
|
||||
void ConfigFile::setOptionalDesktopNotifications(bool show)
|
||||
{
|
||||
QSettings settings(configFile(), QSettings::IniFormat);
|
||||
|
||||
@@ -113,6 +113,7 @@ public:
|
||||
|
||||
int timeout() const;
|
||||
quint64 chunkSize() const;
|
||||
quint64 smallFileSize() const;
|
||||
|
||||
void saveGeometry(QWidget *w);
|
||||
void restoreGeometry(QWidget *w);
|
||||
|
||||
@@ -312,6 +312,13 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
directories.push(qMakePair(QString(), _rootJob.data()));
|
||||
QVector<PropagatorJob*> directoriesToRemove;
|
||||
QString removedDirectory;
|
||||
|
||||
// This needs to be changed before marging - capability to switch on/off scheduling, since
|
||||
// server needs to decide if to do that - scheduling will also bring more features in the future
|
||||
// TODO: change it before marging to -> bool enableScheduledRequests = account()->capabilities().scheduling();
|
||||
bool enableScheduledRequests = true;
|
||||
|
||||
PropagateFiles* filesJob = new PropagateFiles(this);
|
||||
foreach(const SyncFileItemPtr &item, items) {
|
||||
|
||||
if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) {
|
||||
@@ -388,6 +395,9 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
currentDirJob->append(dir);
|
||||
}
|
||||
directories.push(qMakePair(item->destination() + "/" , dir));
|
||||
} else if (enableScheduledRequests
|
||||
&& (item->_instruction == CSYNC_INSTRUCTION_NEW || item->_instruction == CSYNC_INSTRUCTION_SYNC)) {
|
||||
filesJob->append(item);
|
||||
} else if (PropagateItemJob* current = createJob(item)) {
|
||||
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
|
||||
// will delete directories, so defer execution
|
||||
@@ -399,10 +409,18 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
|
||||
}
|
||||
}
|
||||
|
||||
if (enableScheduledRequests && !filesJob->isEmpty()){
|
||||
// This job has parallelism WaitForFinished to allow directoriesToRemove be last
|
||||
_rootJob->append(filesJob);
|
||||
} else {
|
||||
delete filesJob;
|
||||
}
|
||||
|
||||
foreach(PropagatorJob* it, directoriesToRemove) {
|
||||
_rootJob->append(it);
|
||||
}
|
||||
|
||||
|
||||
connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
|
||||
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
|
||||
connect(_rootJob.data(), SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
|
||||
@@ -458,6 +476,20 @@ quint64 OwncloudPropagator::chunkSize()
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
quint64 OwncloudPropagator::smallFileSize()
|
||||
{
|
||||
// A small filesize item is a file whose transfer time
|
||||
// typically will be lower than its bookkeeping time.
|
||||
static uint smallFileSize;
|
||||
if (!smallFileSize) {
|
||||
smallFileSize = qgetenv("OWNCLOUD_SMALLFILE_SIZE").toUInt();
|
||||
if (smallFileSize == 0) {
|
||||
ConfigFile cfg;
|
||||
smallFileSize = cfg.smallFileSize();
|
||||
}
|
||||
}
|
||||
return smallFileSize;
|
||||
}
|
||||
|
||||
bool OwncloudPropagator::localFileNameClash( const QString& relFile )
|
||||
{
|
||||
|
||||
@@ -262,7 +262,6 @@ public:
|
||||
class OwncloudPropagator : public QObject {
|
||||
Q_OBJECT
|
||||
|
||||
PropagateItemJob *createJob(const SyncFileItemPtr& item);
|
||||
QScopedPointer<PropagateDirectory> _rootJob;
|
||||
|
||||
public:
|
||||
@@ -327,6 +326,7 @@ public:
|
||||
|
||||
/** returns the size of chunks in bytes */
|
||||
static quint64 chunkSize();
|
||||
static quint64 smallFileSize();
|
||||
|
||||
AccountPtr account() const;
|
||||
|
||||
@@ -342,7 +342,11 @@ public:
|
||||
*/
|
||||
DiskSpaceResult diskSpaceCheck() const;
|
||||
|
||||
PropagateItemJob *createJob(const SyncFileItemPtr& item);
|
||||
|
||||
int runningAtRootJob(){
|
||||
return _rootJob.data()->_runningNow;
|
||||
}
|
||||
|
||||
private slots:
|
||||
|
||||
@@ -415,6 +419,106 @@ private slots:
|
||||
void slotPollFinished();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief The PropagateFiles class is a container class.
|
||||
*
|
||||
* It will also ensure proper bandwidth utilization vs bookkeeping balance
|
||||
*
|
||||
* @ingroup libsync
|
||||
*
|
||||
* State Machine:
|
||||
*
|
||||
* _________________________________________________ ___________________________________________
|
||||
* | | |
|
||||
* | (Empty DB items queue and populated Data items queue?) |
|
||||
* | | | |
|
||||
* | Yes | | No |
|
||||
* | | | |
|
||||
* |<-----------[Schedule Data job] (Empty Data items queue and populated DB items queue?) |
|
||||
* | | | |
|
||||
* | No | | Yes |
|
||||
* | | | |
|
||||
* | | [Schedule DB job]--------------->|
|
||||
* | | |
|
||||
* | | |
|
||||
* | (Populated Data items queue and populated DB items queue?) |
|
||||
* | | | |
|
||||
* | Yes | | No |
|
||||
* | | | |
|
||||
* | (Active running Data items number exceeded limit?) [Finish - no items] |
|
||||
* | | | |
|
||||
* | No | | Yes |
|
||||
* | | | |
|
||||
* <---[Schedule Data job] [Schedule DB job]------------------------------------------>
|
||||
*
|
||||
*
|
||||
*/
|
||||
class PropagateFiles : public PropagatorJob {
|
||||
Q_OBJECT
|
||||
public:
|
||||
QVector<PropagatorJob *> _subJobs;
|
||||
|
||||
QVector<SyncFileItemPtr> _syncDBItems; // Items which bookkeeping on the server is longer then the transfer of its payload
|
||||
|
||||
QVector<SyncFileItemPtr> _syncDataItems; // Items which transfer of the payload is longer then bookkeeping on the server
|
||||
|
||||
int _jobsFinished; // number of jobs that have completed
|
||||
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
|
||||
int _firstUnfinishedSubJob;
|
||||
int _totalItems;
|
||||
int _activeDBJobsNow;
|
||||
int _activeDataJobsNow;
|
||||
|
||||
explicit PropagateFiles(OwncloudPropagator *propagator)
|
||||
: PropagatorJob(propagator)
|
||||
, _jobsFinished(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0), _totalItems(0), _activeDBJobsNow(0), _activeDataJobsNow(0) { }
|
||||
|
||||
virtual ~PropagateFiles() {
|
||||
qDeleteAll(_subJobs);
|
||||
}
|
||||
|
||||
bool isEmpty() {
|
||||
return _syncDBItems.isEmpty() && _syncDataItems.isEmpty();
|
||||
}
|
||||
|
||||
void append(const SyncFileItemPtr &item);
|
||||
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
|
||||
bool scheduleNewJob(QVector<SyncFileItemPtr> &syncJobs);
|
||||
bool scheduleNextItem();
|
||||
|
||||
virtual void abort() Q_DECL_OVERRIDE {
|
||||
foreach (PropagatorJob *n, _subJobs)
|
||||
n->abort();
|
||||
}
|
||||
|
||||
void finalize();
|
||||
|
||||
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
|
||||
|
||||
JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinished; }
|
||||
|
||||
private slots:
|
||||
bool possiblyRunNextJob(PropagatorJob *next) {
|
||||
if (next->_state == NotYetStarted) {
|
||||
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
|
||||
connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
|
||||
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
|
||||
connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
|
||||
connect(next, SIGNAL(ready()), this, SIGNAL(ready()));
|
||||
|
||||
PropagateItemJob *job = qobject_cast<PropagateItemJob *>(next);
|
||||
if(job->_item->_size <= propagator()->smallFileSize()){
|
||||
_activeDBJobsNow++;
|
||||
} else {
|
||||
_activeDataJobsNow++;
|
||||
}
|
||||
}
|
||||
return next->scheduleNextJob();
|
||||
}
|
||||
|
||||
void slotSubJobFinished(SyncFileItem::Status status);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@@ -115,7 +115,7 @@ public:
|
||||
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
|
||||
|
||||
// We think it might finish quickly because it is a small file.
|
||||
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
|
||||
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); }
|
||||
|
||||
/**
|
||||
* Whether an existing folder with the same name may be deleted before
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
/*
|
||||
* Copyright (C) by Piotr Mrowczynski <piotr@owncloud.com>
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation; either version 2 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but
|
||||
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
#include "owncloudpropagator.h"
|
||||
|
||||
namespace OCC {
|
||||
|
||||
|
||||
qint64 PropagateFiles::committedDiskSpace() const
|
||||
{
|
||||
qint64 needed = 0;
|
||||
foreach (PropagatorJob* job, _subJobs) {
|
||||
needed += job->committedDiskSpace();
|
||||
}
|
||||
return needed;
|
||||
}
|
||||
|
||||
void PropagateFiles::append(const SyncFileItemPtr &item)
|
||||
{
|
||||
_totalItems++;
|
||||
if(item->_size <= propagator()->smallFileSize()){
|
||||
_syncDBItems.append(item);
|
||||
} else {
|
||||
_syncDataItems.append(item);
|
||||
}
|
||||
}
|
||||
|
||||
bool PropagateFiles::scheduleNextJob()
|
||||
{
|
||||
if (_state == Finished) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (_state == NotYetStarted) {
|
||||
_state = Running;
|
||||
|
||||
if (isEmpty()) {
|
||||
finalize();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (_state == Running) {
|
||||
// This will ensure that all other jobs in the earlier PropagateDirectory are finished, so we can start with data transfers
|
||||
if (propagator()->runningAtRootJob() != 1){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// cache the value of first unfinished subjob
|
||||
int i = _firstUnfinishedSubJob;
|
||||
int subJobsCount = _subJobs.count();
|
||||
while (i < subJobsCount && _subJobs.at(i)->_state == Finished) {
|
||||
_firstUnfinishedSubJob = ++i;
|
||||
}
|
||||
for (int i = _firstUnfinishedSubJob; i < subJobsCount; ++i) {
|
||||
if (_subJobs.at(i)->_state == Finished) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (possiblyRunNextJob(_subJobs.at(i))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Q_ASSERT(_subJobs.at(i)->_state == Running);
|
||||
}
|
||||
|
||||
return scheduleNextItem();
|
||||
}
|
||||
|
||||
bool PropagateFiles::scheduleNewJob(QVector<SyncFileItemPtr> &syncJobs){
|
||||
// This function is used to schedule new job and lazily create job from sync items
|
||||
Q_ASSERT(!syncJobs.isEmpty());
|
||||
|
||||
// Equivalent to Qt5 takeFirst()
|
||||
const SyncFileItemPtr &item = syncJobs.first();
|
||||
PropagateItemJob* job = propagator()->createJob(item);
|
||||
_subJobs.append(job);
|
||||
syncJobs.pop_front();
|
||||
return possiblyRunNextJob(job);
|
||||
}
|
||||
|
||||
bool PropagateFiles::scheduleNextItem()
|
||||
{
|
||||
/// This function holds the whole bookkeeping/data-transfers balance logic
|
||||
|
||||
bool syncDBItemsEmpty = _syncDBItems.isEmpty();
|
||||
bool syncDataItemsEmpty = _syncDataItems.isEmpty();
|
||||
|
||||
if (syncDBItemsEmpty && !syncDataItemsEmpty){
|
||||
// There are no more DB jobs, ensure to maximally parallelise Data Transfers now
|
||||
return scheduleNewJob(_syncDataItems);
|
||||
} else if (!syncDBItemsEmpty && syncDataItemsEmpty){
|
||||
// There are no more data transfer jobs, ensure to maximally parallelise DB jobs now
|
||||
return scheduleNewJob(_syncDBItems);
|
||||
} else if (!syncDBItemsEmpty && !syncDataItemsEmpty){
|
||||
// Both queues have items, ensure bookkeeping and data transfer balance
|
||||
if (_activeDataJobsNow < 2){
|
||||
// By default, we have max 3 connections available for bigger files
|
||||
// On the other hand, we have max 6 for isLikelyToFinishQuickly files (also small files)
|
||||
// It makes sense to use 2 queues for bigger data transfers, and leave the remaining 1-4 for faster operations
|
||||
return scheduleNewJob(_syncDataItems);
|
||||
} else {
|
||||
return scheduleNewJob(_syncDBItems);
|
||||
}
|
||||
}
|
||||
|
||||
// This means that we have no more file-items to sync -> finish
|
||||
return false;
|
||||
}
|
||||
|
||||
void PropagateFiles::slotSubJobFinished(SyncFileItem::Status status)
|
||||
{
|
||||
if (status == SyncFileItem::FatalError) {
|
||||
abort();
|
||||
_state = Finished;
|
||||
emit finished(status);
|
||||
return;
|
||||
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
|
||||
_hasError = status;
|
||||
}
|
||||
|
||||
PropagateItemJob *job = qobject_cast<PropagateItemJob *>(sender());
|
||||
if(job->_item->_size <= propagator()->smallFileSize()){
|
||||
_activeDBJobsNow--;
|
||||
} else {
|
||||
_activeDataJobsNow--;
|
||||
}
|
||||
Q_ASSERT(job);
|
||||
|
||||
_jobsFinished++;
|
||||
|
||||
// We finished processing all the jobs
|
||||
// check if we finished
|
||||
if (_jobsFinished >= _totalItems) {
|
||||
Q_ASSERT(!_activeDBJobsNow && !_activeDataJobsNow); // how can we be finished if there are still jobs running now
|
||||
finalize();
|
||||
} else {
|
||||
emit ready();
|
||||
}
|
||||
}
|
||||
|
||||
void PropagateFiles::finalize()
|
||||
{
|
||||
_state = Finished;
|
||||
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -209,7 +209,7 @@ public:
|
||||
|
||||
void start() Q_DECL_OVERRIDE;
|
||||
|
||||
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
|
||||
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); }
|
||||
|
||||
private slots:
|
||||
void slotComputeContentChecksum();
|
||||
|
||||
Referência em uma Nova Issue
Bloquear um usuário