From 1bb4940e14e504df6b76103162a4dbbd4316a0bc Mon Sep 17 00:00:00 2001 From: Neeraj Gupta <254676+ua741@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:46:02 +0530 Subject: [PATCH] Handle deletion from inFlight replica --- server/ente/filedata/filedata.go | 1 + server/pkg/controller/filedata/delete.go | 7 +- server/pkg/repo/filedata/repository.go | 118 +++++++++++++++-------- 3 files changed, 83 insertions(+), 43 deletions(-) diff --git a/server/ente/filedata/filedata.go b/server/ente/filedata/filedata.go index 807f2b5e6b..20ff30aea1 100644 --- a/server/ente/filedata/filedata.go +++ b/server/ente/filedata/filedata.go @@ -40,6 +40,7 @@ type Row struct { LatestBucket string ReplicatedBuckets []string DeleteFromBuckets []string + InflightReplicas []string PendingSync bool IsDeleted bool LastSyncTime int64 diff --git a/server/pkg/controller/filedata/delete.go b/server/pkg/controller/filedata/delete.go index e9ada5a0f8..1673e7c0fc 100644 --- a/server/pkg/controller/filedata/delete.go +++ b/server/pkg/controller/filedata/delete.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ente-io/museum/ente/filedata" "github.com/ente-io/museum/pkg/repo" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" "github.com/ente-io/museum/pkg/utils/time" log "github.com/sirupsen/logrus" "strconv" @@ -70,7 +71,7 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) { return } } - dbErr := c.Repo.RemoveBucketFromDeletedBuckets(fileDataRow, bucketID) + dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.DeletionColumn) if dbErr != nil { ctxLogger.WithError(dbErr).Error("Failed to remove from db") return @@ -86,7 +87,7 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) { return } } - dbErr := c.Repo.RemoveBucketFromReplicatedBuckets(fileDataRow, bucketID) + dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.ReplicationColumn) if dbErr != nil { ctxLogger.WithError(dbErr).Error("Failed to remove from db") return @@ -100,7 +101,7 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) { return } } - dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow.FileID, fileDataRow.Type, fileDataRow.LatestBucket) + dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow) if dbErr != nil { ctxLogger.WithError(dbErr).Error("Failed to remove from db") return diff --git a/server/pkg/repo/filedata/repository.go b/server/pkg/repo/filedata/repository.go index 8b56a76d33..75c409a6c6 100644 --- a/server/pkg/repo/filedata/repository.go +++ b/server/pkg/repo/filedata/repository.go @@ -3,6 +3,7 @@ package filedata import ( "context" "database/sql" + "fmt" "github.com/ente-io/museum/ente" "github.com/ente-io/museum/ente/filedata" "github.com/ente-io/stacktrace" @@ -14,6 +15,12 @@ type Repository struct { DB *sql.DB } +const ( + ReplicationColumn = "replicated_buckets" + DeletionColumn = "delete_from_buckets" + InflightRepColumn = "inflight_rep_buckets" +) + func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) error { query := ` INSERT INTO file_data @@ -26,7 +33,7 @@ func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) erro delete_from_buckets = array( SELECT DISTINCT elem FROM unnest( array_append( - array_cat(file_data.replicated_buckets, file_data.delete_from_buckets), + array_cat(array_cat(file_data.replicated_buckets, file_data.delete_from_buckets), file_data.inflight_rep_buckets), CASE WHEN file_data.latest_bucket != EXCLUDED.latest_bucket THEN file_data.latest_bucket END ) ) AS elem @@ -45,7 +52,7 @@ func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) erro } func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fileIDs []int64) ([]filedata.Row, error) { - rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at FROM file_data WHERE data_type = $1 AND file_id = ANY($2)`, string(oType), pq.Array(fileIDs)) if err != nil { @@ -55,7 +62,7 @@ func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fi } func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) { - rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at FROM file_data WHERE file_id = $1`, fileIDs) if err != nil { @@ -64,65 +71,97 @@ func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata return convertRowsToFilesData(rows) } -func (r *Repository) RemoveBucketFromDeletedBuckets(row filedata.Row, bucketID string) error { - query := ` - UPDATE file_data - SET delete_from_buckets = array( - SELECT DISTINCT elem FROM unnest( - array_remove( - file_data.delete_from_buckets, - $1 - ) - ) AS elem - WHERE elem IS NOT NULL - ) - WHERE file_id = $2 AND data_type = $3 and is_deleted = true` - result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type)) +func (r *Repository) AddBucket(row filedata.Row, bucketID string, columnName string) error { + query := fmt.Sprintf(` + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_append(file_data.%s, $1) + ) AS elem + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName) + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) if err != nil { - return stacktrace.Propagate(err, "failed to remove bucket from deleted buckets") + return stacktrace.Propagate(err, "failed to add bucket to "+columnName) } rowsAffected, err := result.RowsAffected() if err != nil { return stacktrace.Propagate(err, "") } if rowsAffected == 0 { - return stacktrace.NewError("bucket not removed from deleted buckets") + return stacktrace.NewError("bucket not added to " + columnName) } return nil } -func (r *Repository) RemoveBucketFromReplicatedBuckets(row filedata.Row, bucketID string) error { - query := ` - UPDATE file_data - SET replicated_buckets = array( - SELECT DISTINCT elem FROM unnest( - array_remove( - file_data.replicated_buckets, - $1 - ) - ) AS elem - WHERE elem IS NOT NULL - ) - WHERE file_id = $2 AND data_type = $3` - result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type)) +func (r *Repository) RemoveBucket(row filedata.Row, bucketID string, columnName string) error { + query := fmt.Sprintf(` + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName) + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) if err != nil { - return stacktrace.Propagate(err, "failed to remove bucket from replicated buckets") + return stacktrace.Propagate(err, "failed to remove bucket from "+columnName) } rowsAffected, err := result.RowsAffected() if err != nil { return stacktrace.Propagate(err, "") } if rowsAffected == 0 { - return stacktrace.NewError("bucket not removed from deleted buckets") + return stacktrace.NewError("bucket not removed from " + columnName) } return nil } -func (r *Repository) DeleteFileData(ctx context.Context, fileID int64, oType ente.ObjectType, latestBucketID string) error { +func (r *Repository) MoveBetweenBuckets(row filedata.Row, bucketID string, sourceColumn string, destColumn string) error { + query := fmt.Sprintf(` + UPDATE file_data + SET %s = array( + SELECT DISTINCT elem FROM unnest( + array_append( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ), + %s = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.%s, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3 and user_id = $4`, destColumn, destColumn, sourceColumn, sourceColumn) + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID) + if err != nil { + return stacktrace.Propagate(err, "failed to move bucket from "+sourceColumn+" to "+destColumn) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("bucket not moved from " + sourceColumn + " to " + destColumn) + } + return nil +} + +func (r *Repository) DeleteFileData(ctx context.Context, row filedata.Row) error { query := ` DELETE FROM file_data -WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[]` - res, err := r.DB.ExecContext(ctx, query, fileID, string(oType), latestBucketID) +WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND user_id = $4 AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[]` + res, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.LatestBucket, row.UserID) if err != nil { return stacktrace.Propagate(err, "") } @@ -141,12 +180,11 @@ func convertRowsToFilesData(rows *sql.Rows) ([]filedata.Row, error) { var filesData []filedata.Row for rows.Next() { var fileData filedata.Row - err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), &fileData.PendingSync, &fileData.IsDeleted, &fileData.LastSyncTime, &fileData.CreatedAt, &fileData.UpdatedAt) + err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.LastSyncTime, &fileData.CreatedAt, &fileData.UpdatedAt) if err != nil { return nil, stacktrace.Propagate(err, "") } filesData = append(filesData, fileData) } return filesData, nil - }