[sever][replication] Avoid long txn during replication

This commit is contained in:
Neeraj Gupta 2024-08-12 15:02:20 +05:30
parent 6266d16544
commit 992ac53a27
3 changed files with 39 additions and 67 deletions

View File

@ -222,7 +222,7 @@ func (c *ReplicationController3) replicate(i int) {
// objects left to replicate currently.
func (c *ReplicationController3) tryReplicate() error {
// Fetch an object to replicate
tx, copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
copies, err := c.ObjectCopiesRepo.GetAndLockUnreplicatedObject()
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Errorf("Could not fetch an object to replicate: %s", err)
@ -237,44 +237,16 @@ func (c *ReplicationController3) tryReplicate() error {
"object_key": objectKey,
})
commit := func(err error) error {
// We don't rollback the transaction even in the case of errors, and
// instead try to commit it after setting the last_attempt timestamp.
//
// This avoids the replication getting stuck in a loop trying (and
// failing) to replicate the same object. The error would still need to
// be resolved, but at least the replication would meanwhile move
// forward, ignoring this row.
done := func(err error) error {
if err != nil {
logger.Error(err)
}
aerr := c.ObjectCopiesRepo.RegisterReplicationAttempt(tx, objectKey)
if aerr != nil {
aerr = stacktrace.Propagate(aerr, "Failed to mark replication attempt")
logger.Error(aerr)
}
cerr := tx.Commit()
if cerr != nil {
cerr = stacktrace.Propagate(err, "Failed to commit transaction")
logger.Error(cerr)
}
if err == nil {
err = aerr
}
if err == nil {
err = cerr
}
if err == nil {
logger.Info("Replication attempt succeeded")
} else {
logger.Info("Replication attempt failed")
logger.WithError(err).Info("Replication attempt failed")
}
return err
}
@ -282,30 +254,30 @@ func (c *ReplicationController3) tryReplicate() error {
if copies.B2 == nil {
err := errors.New("expected B2 copy to be in place before we start replication")
return commit(stacktrace.Propagate(err, "Sanity check failed"))
return done(stacktrace.Propagate(err, "Sanity check failed"))
}
if !copies.WantWasabi && !copies.WantSCW {
err := errors.New("expected at least one of want_wasabi and want_scw to be true when trying to replicate")
return commit(stacktrace.Propagate(err, "Sanity check failed"))
return done(stacktrace.Propagate(err, "Sanity check failed"))
}
ob, err := c.ObjectRepo.GetObjectState(tx, objectKey)
ob, err := c.ObjectRepo.GetObjectState(objectKey)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
return done(stacktrace.Propagate(err, "Failed to fetch file's deleted status"))
}
if ob.IsFileDeleted || ob.IsUserDeleted {
// Update the object_copies to mark this object as not requiring further
// replication. The row in object_copies will get deleted when the next
// scheduled object deletion runs.
err = c.ObjectCopiesRepo.UnmarkFromReplication(tx, objectKey)
err = c.ObjectCopiesRepo.UnmarkFromReplication(objectKey)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
return done(stacktrace.Propagate(err, "Failed to mark an object not requiring further replication"))
}
logger.Infof("Skipping replication for deleted object (isFileDeleted = %v, isUserDeleted = %v)",
ob.IsFileDeleted, ob.IsUserDeleted)
return commit(nil)
return done(nil)
}
err = ensureSufficientSpace(ob.Size)
@ -316,19 +288,19 @@ func (c *ReplicationController3) tryReplicate() error {
//
// Log this error though, so that it gets noticed if it happens too
// frequently (the instance might need a bigger disk).
return commit(stacktrace.Propagate(err, ""))
return done(stacktrace.Propagate(err, ""))
}
filePath, file, err := c.createTemporaryFile(objectKey)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to create temporary file"))
return done(stacktrace.Propagate(err, "Failed to create temporary file"))
}
defer os.Remove(filePath)
defer file.Close()
size, err := c.downloadFromB2ViaWorker(objectKey, file, logger)
if err != nil {
return commit(stacktrace.Propagate(err, "Failed to download object from B2"))
return done(stacktrace.Propagate(err, "Failed to download object from B2"))
}
logger.Infof("Downloaded %d bytes to %s", size, filePath)
@ -343,21 +315,20 @@ func (c *ReplicationController3) tryReplicate() error {
if copies.WantWasabi && copies.Wasabi == nil {
werr := c.replicateFile(in, c.wasabiDest, func() error {
return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(tx, objectKey)
return c.ObjectCopiesRepo.MarkObjectReplicatedWasabi(objectKey)
})
err = werr
}
if copies.WantSCW && copies.SCW == nil {
serr := c.replicateFile(in, c.scwDest, func() error {
return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(tx, objectKey)
return c.ObjectCopiesRepo.MarkObjectReplicatedScaleway(objectKey)
})
if err == nil {
err = serr
}
}
return commit(err)
return done(err)
}
// Return an error if we risk running out of disk space if we try to download

View File

@ -200,8 +200,8 @@ func (repo *ObjectRepository) DoesObjectOrTempObjectExist(objectKey string) (boo
//
// Unknown objects (i.e. objectKeys for which there are no entries) are
// considered as deleted.
func (repo *ObjectRepository) GetObjectState(tx *sql.Tx, objectKey string) (ObjectState ente.ObjectState, err error) {
row := tx.QueryRow(`
func (repo *ObjectRepository) GetObjectState(objectKey string) (ObjectState ente.ObjectState, err error) {
row := repo.DB.QueryRow(`
SELECT ok.is_deleted, u.encrypted_email IS NULL AS is_user_deleted, ok.size
FROM object_keys ok
JOIN files f ON ok.file_id = f.file_id

View File

@ -18,17 +18,14 @@ type ObjectCopiesRepository struct {
}
// GetAndLockUnreplicatedObject gets an object which is not yet replicated to
// all the replicas. It also starts a transaction to keep the row corresponding
// to that object in the database locked.
// all the replicas. It also registers a replication to keep the row corresponding
// to that object to be blocked for 24h before next replication attemp.
//
// Both tx and objectCopies are guaranteed to be nil if error is not nil.
//
// If the returned transaction is not `nil`, it must be either `Rollback`ed or
// `Commit`ed.
func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*sql.Tx, *ente.ObjectCopies, error) {
// ObjectCopies is guaranteed to be nil if error is not nil.
func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*ente.ObjectCopies, error) {
tx, err := repo.DB.Begin()
if err != nil {
return nil, nil, stacktrace.Propagate(err, "")
return nil, stacktrace.Propagate(err, "")
}
rollback := func() {
@ -64,15 +61,19 @@ func (repo *ObjectCopiesRepository) GetAndLockUnreplicatedObject() (*sql.Tx, *en
if err != nil && errors.Is(err, sql.ErrNoRows) {
commit()
return nil, nil, err
return nil, err
}
if err != nil {
rollback()
return nil, nil, stacktrace.Propagate(err, "")
return nil, stacktrace.Propagate(err, "")
}
return tx, &r, nil
err = repo.RegisterReplicationAttempt(tx, r.ObjectKey)
if err != nil {
rollback()
return nil, stacktrace.Propagate(err, "failed to register replication attempt")
}
return &r, nil
}
// CreateNewB2Object creates a new entry for objectKey and marks it as having
@ -139,8 +140,8 @@ func (repo *ObjectCopiesRepository) ResetNeedsScalewayReplication(objectKey stri
// UnmarkFromReplication clears the want_* flags so that this objectKey is
// marked as not requiring further replication.
func (repo *ObjectCopiesRepository) UnmarkFromReplication(tx *sql.Tx, objectKey string) error {
_, err := tx.Exec(`
func (repo *ObjectCopiesRepository) UnmarkFromReplication(objectKey string) error {
_, err := repo.DB.Exec(`
UPDATE object_copies
SET want_b2 = false, want_wasabi = false, want_scw = false
WHERE object_key = $1
@ -150,24 +151,24 @@ func (repo *ObjectCopiesRepository) UnmarkFromReplication(tx *sql.Tx, objectKey
// MarkObjectReplicatedB2 sets the time when `objectKey` was replicated to
// Wasabi to the current timestamp.
func (repo *ObjectCopiesRepository) MarkObjectReplicatedWasabi(tx *sql.Tx, objectKey string) error {
func (repo *ObjectCopiesRepository) MarkObjectReplicatedWasabi(objectKey string) error {
return repo.markObjectReplicated(`
UPDATE object_copies SET wasabi = now_utc_micro_seconds()
WHERE object_key = $1
`, tx, objectKey)
`, objectKey)
}
// MarkObjectReplicatedScaleway sets the time when `objectKey` was replicated to
// Wasabi to the current timestamp.
func (repo *ObjectCopiesRepository) MarkObjectReplicatedScaleway(tx *sql.Tx, objectKey string) error {
func (repo *ObjectCopiesRepository) MarkObjectReplicatedScaleway(objectKey string) error {
return repo.markObjectReplicated(`
UPDATE object_copies SET scw = now_utc_micro_seconds()
WHERE object_key = $1
`, tx, objectKey)
`, objectKey)
}
func (repo *ObjectCopiesRepository) markObjectReplicated(query string, tx *sql.Tx, objectKey string) error {
result, err := tx.Exec(query, objectKey)
func (repo *ObjectCopiesRepository) markObjectReplicated(query string, objectKey string) error {
result, err := repo.DB.Exec(query, objectKey)
if err != nil {
return stacktrace.Propagate(err, "")
}