Comparar commits

...

1 Commits

10 arquivos alterados com 318 adições e 3 exclusões
+1
Ver Arquivo
@@ -57,6 +57,7 @@ set(libsync_SRCS
propagateremotedelete.cpp
propagateremotemove.cpp
propagateremotemkdir.cpp
propagatefiles.cpp
syncengine.cpp
syncfilestatus.cpp
syncfilestatustracker.cpp
+9
Ver Arquivo
@@ -116,6 +116,15 @@ bool Capabilities::chunkingNg() const
return _capabilities["dav"].toMap()["chunking"].toByteArray() >= "1.0";
}
bool Capabilities::scheduling() const
{
static const auto scheduling = qgetenv("OWNCLOUD_SCHEDULING");
if (scheduling == "0") return false;
if (scheduling == "1") return true;
return _capabilities["dav"].toMap()["scheduling"].toByteArray() >= "1.0";
}
bool Capabilities::chunkingParallelUploadDisabled() const
{
return _capabilities["dav"].toMap()["chunkingParallelUploadDisabled"].toBool();
+1
Ver Arquivo
@@ -41,6 +41,7 @@ public:
int sharePublicLinkExpireDateDays() const;
bool shareResharing() const;
bool chunkingNg() const;
bool scheduling() const;
/// disable parallel upload in chunking
bool chunkingParallelUploadDisabled() const;
+7
Ver Arquivo
@@ -52,6 +52,7 @@ static const char updateCheckIntervalC[] = "updateCheckInterval";
static const char geometryC[] = "geometry";
static const char timeoutC[] = "timeout";
static const char chunkSizeC[] = "chunkSize";
static const char smallFileSizeC[] = "smallFileSize";
static const char proxyHostC[] = "Proxy/host";
static const char proxyTypeC[] = "Proxy/type";
@@ -128,6 +129,12 @@ quint64 ConfigFile::chunkSize() const
return settings.value(QLatin1String(chunkSizeC), 10*1000*1000).toLongLong(); // default to 10 MB
}
quint64 ConfigFile::smallFileSize() const
{
QSettings settings(configFile(), QSettings::IniFormat);
return settings.value(QLatin1String(smallFileSizeC), 500*1000).toLongLong(); // default to 500 kB
}
void ConfigFile::setOptionalDesktopNotifications(bool show)
{
QSettings settings(configFile(), QSettings::IniFormat);
+1
Ver Arquivo
@@ -113,6 +113,7 @@ public:
int timeout() const;
quint64 chunkSize() const;
quint64 smallFileSize() const;
void saveGeometry(QWidget *w);
void restoreGeometry(QWidget *w);
+32
Ver Arquivo
@@ -312,6 +312,13 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
directories.push(qMakePair(QString(), _rootJob.data()));
QVector<PropagatorJob*> directoriesToRemove;
QString removedDirectory;
// This needs to be changed before marging - capability to switch on/off scheduling, since
// server needs to decide if to do that - scheduling will also bring more features in the future
// TODO: change it before marging to -> bool enableScheduledRequests = account()->capabilities().scheduling();
bool enableScheduledRequests = true;
PropagateFiles* filesJob = new PropagateFiles(this);
foreach(const SyncFileItemPtr &item, items) {
if (!removedDirectory.isEmpty() && item->_file.startsWith(removedDirectory)) {
@@ -388,6 +395,9 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
currentDirJob->append(dir);
}
directories.push(qMakePair(item->destination() + "/" , dir));
} else if (enableScheduledRequests
&& (item->_instruction == CSYNC_INSTRUCTION_NEW || item->_instruction == CSYNC_INSTRUCTION_SYNC)) {
filesJob->append(item);
} else if (PropagateItemJob* current = createJob(item)) {
if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
// will delete directories, so defer execution
@@ -399,10 +409,18 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
}
}
if (enableScheduledRequests && !filesJob->isEmpty()){
// This job has parallelism WaitForFinished to allow directoriesToRemove be last
_rootJob->append(filesJob);
} else {
delete filesJob;
}
foreach(PropagatorJob* it, directoriesToRemove) {
_rootJob->append(it);
}
connect(_rootJob.data(), SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(_rootJob.data(), SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
@@ -458,6 +476,20 @@ quint64 OwncloudPropagator::chunkSize()
return chunkSize;
}
quint64 OwncloudPropagator::smallFileSize()
{
// A small filesize item is a file whose transfer time
// typically will be lower than its bookkeeping time.
static uint smallFileSize;
if (!smallFileSize) {
smallFileSize = qgetenv("OWNCLOUD_SMALLFILE_SIZE").toUInt();
if (smallFileSize == 0) {
ConfigFile cfg;
smallFileSize = cfg.smallFileSize();
}
}
return smallFileSize;
}
bool OwncloudPropagator::localFileNameClash( const QString& relFile )
{
+105 -1
Ver Arquivo
@@ -262,7 +262,6 @@ public:
class OwncloudPropagator : public QObject {
Q_OBJECT
PropagateItemJob *createJob(const SyncFileItemPtr& item);
QScopedPointer<PropagateDirectory> _rootJob;
public:
@@ -327,6 +326,7 @@ public:
/** returns the size of chunks in bytes */
static quint64 chunkSize();
static quint64 smallFileSize();
AccountPtr account() const;
@@ -342,7 +342,11 @@ public:
*/
DiskSpaceResult diskSpaceCheck() const;
PropagateItemJob *createJob(const SyncFileItemPtr& item);
int runningAtRootJob(){
return _rootJob.data()->_runningNow;
}
private slots:
@@ -415,6 +419,106 @@ private slots:
void slotPollFinished();
};
/**
* @brief The PropagateFiles class is a container class.
*
* It will also ensure proper bandwidth utilization vs bookkeeping balance
*
* @ingroup libsync
*
* State Machine:
*
* _________________________________________________ ___________________________________________
* | | |
* | (Empty DB items queue and populated Data items queue?) |
* | | | |
* | Yes | | No |
* | | | |
* |<-----------[Schedule Data job] (Empty Data items queue and populated DB items queue?) |
* | | | |
* | No | | Yes |
* | | | |
* | | [Schedule DB job]--------------->|
* | | |
* | | |
* | (Populated Data items queue and populated DB items queue?) |
* | | | |
* | Yes | | No |
* | | | |
* | (Active running Data items number exceeded limit?) [Finish - no items] |
* | | | |
* | No | | Yes |
* | | | |
* <---[Schedule Data job] [Schedule DB job]------------------------------------------>
*
*
*/
class PropagateFiles : public PropagatorJob {
Q_OBJECT
public:
QVector<PropagatorJob *> _subJobs;
QVector<SyncFileItemPtr> _syncDBItems; // Items which bookkeeping on the server is longer then the transfer of its payload
QVector<SyncFileItemPtr> _syncDataItems; // Items which transfer of the payload is longer then bookkeeping on the server
int _jobsFinished; // number of jobs that have completed
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
int _firstUnfinishedSubJob;
int _totalItems;
int _activeDBJobsNow;
int _activeDataJobsNow;
explicit PropagateFiles(OwncloudPropagator *propagator)
: PropagatorJob(propagator)
, _jobsFinished(0), _hasError(SyncFileItem::NoStatus), _firstUnfinishedSubJob(0), _totalItems(0), _activeDBJobsNow(0), _activeDataJobsNow(0) { }
virtual ~PropagateFiles() {
qDeleteAll(_subJobs);
}
bool isEmpty() {
return _syncDBItems.isEmpty() && _syncDataItems.isEmpty();
}
void append(const SyncFileItemPtr &item);
virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
bool scheduleNewJob(QVector<SyncFileItemPtr> &syncJobs);
bool scheduleNextItem();
virtual void abort() Q_DECL_OVERRIDE {
foreach (PropagatorJob *n, _subJobs)
n->abort();
}
void finalize();
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
JobParallelism parallelism() Q_DECL_OVERRIDE { return OCC::PropagatorJob::WaitForFinished; }
private slots:
bool possiblyRunNextJob(PropagatorJob *next) {
if (next->_state == NotYetStarted) {
connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection);
connect(next, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)),
this, SIGNAL(itemCompleted(const SyncFileItem &, const PropagatorJob &)));
connect(next, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
connect(next, SIGNAL(ready()), this, SIGNAL(ready()));
PropagateItemJob *job = qobject_cast<PropagateItemJob *>(next);
if(job->_item->_size <= propagator()->smallFileSize()){
_activeDBJobsNow++;
} else {
_activeDataJobsNow++;
}
}
return next->scheduleNextJob();
}
void slotSubJobFinished(SyncFileItem::Status status);
};
}
#endif
+1 -1
Ver Arquivo
@@ -115,7 +115,7 @@ public:
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
// We think it might finish quickly because it is a small file.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); }
/**
* Whether an existing folder with the same name may be deleted before
+160
Ver Arquivo
@@ -0,0 +1,160 @@
/*
* Copyright (C) by Piotr Mrowczynski <piotr@owncloud.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "owncloudpropagator.h"
namespace OCC {
qint64 PropagateFiles::committedDiskSpace() const
{
qint64 needed = 0;
foreach (PropagatorJob* job, _subJobs) {
needed += job->committedDiskSpace();
}
return needed;
}
void PropagateFiles::append(const SyncFileItemPtr &item)
{
_totalItems++;
if(item->_size <= propagator()->smallFileSize()){
_syncDBItems.append(item);
} else {
_syncDataItems.append(item);
}
}
bool PropagateFiles::scheduleNextJob()
{
if (_state == Finished) {
return false;
}
if (_state == NotYetStarted) {
_state = Running;
if (isEmpty()) {
finalize();
return true;
}
}
if (_state == Running) {
// This will ensure that all other jobs in the earlier PropagateDirectory are finished, so we can start with data transfers
if (propagator()->runningAtRootJob() != 1){
return false;
}
}
// cache the value of first unfinished subjob
int i = _firstUnfinishedSubJob;
int subJobsCount = _subJobs.count();
while (i < subJobsCount && _subJobs.at(i)->_state == Finished) {
_firstUnfinishedSubJob = ++i;
}
for (int i = _firstUnfinishedSubJob; i < subJobsCount; ++i) {
if (_subJobs.at(i)->_state == Finished) {
continue;
}
if (possiblyRunNextJob(_subJobs.at(i))) {
return true;
}
Q_ASSERT(_subJobs.at(i)->_state == Running);
}
return scheduleNextItem();
}
bool PropagateFiles::scheduleNewJob(QVector<SyncFileItemPtr> &syncJobs){
// This function is used to schedule new job and lazily create job from sync items
Q_ASSERT(!syncJobs.isEmpty());
// Equivalent to Qt5 takeFirst()
const SyncFileItemPtr &item = syncJobs.first();
PropagateItemJob* job = propagator()->createJob(item);
_subJobs.append(job);
syncJobs.pop_front();
return possiblyRunNextJob(job);
}
bool PropagateFiles::scheduleNextItem()
{
/// This function holds the whole bookkeeping/data-transfers balance logic
bool syncDBItemsEmpty = _syncDBItems.isEmpty();
bool syncDataItemsEmpty = _syncDataItems.isEmpty();
if (syncDBItemsEmpty && !syncDataItemsEmpty){
// There are no more DB jobs, ensure to maximally parallelise Data Transfers now
return scheduleNewJob(_syncDataItems);
} else if (!syncDBItemsEmpty && syncDataItemsEmpty){
// There are no more data transfer jobs, ensure to maximally parallelise DB jobs now
return scheduleNewJob(_syncDBItems);
} else if (!syncDBItemsEmpty && !syncDataItemsEmpty){
// Both queues have items, ensure bookkeeping and data transfer balance
if (_activeDataJobsNow < 2){
// By default, we have max 3 connections available for bigger files
// On the other hand, we have max 6 for isLikelyToFinishQuickly files (also small files)
// It makes sense to use 2 queues for bigger data transfers, and leave the remaining 1-4 for faster operations
return scheduleNewJob(_syncDataItems);
} else {
return scheduleNewJob(_syncDBItems);
}
}
// This means that we have no more file-items to sync -> finish
return false;
}
void PropagateFiles::slotSubJobFinished(SyncFileItem::Status status)
{
if (status == SyncFileItem::FatalError) {
abort();
_state = Finished;
emit finished(status);
return;
} else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) {
_hasError = status;
}
PropagateItemJob *job = qobject_cast<PropagateItemJob *>(sender());
if(job->_item->_size <= propagator()->smallFileSize()){
_activeDBJobsNow--;
} else {
_activeDataJobsNow--;
}
Q_ASSERT(job);
_jobsFinished++;
// We finished processing all the jobs
// check if we finished
if (_jobsFinished >= _totalItems) {
Q_ASSERT(!_activeDBJobsNow && !_activeDataJobsNow); // how can we be finished if there are still jobs running now
finalize();
} else {
emit ready();
}
}
void PropagateFiles::finalize()
{
_state = Finished;
emit finished(_hasError == SyncFileItem::NoStatus ? SyncFileItem::Success : _hasError);
}
}
+1 -1
Ver Arquivo
@@ -209,7 +209,7 @@ public:
void start() Q_DECL_OVERRIDE;
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < 100*1024; }
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return _item->_size < propagator()->smallFileSize(); }
private slots:
void slotComputeContentChecksum();