mirror of
https://github.com/ente-io/ente.git
synced 2025-08-08 07:28:26 +00:00
Handle deletion from inFlight replica
This commit is contained in:
parent
5f14057b65
commit
1bb4940e14
@ -40,6 +40,7 @@ type Row struct {
|
||||
LatestBucket string
|
||||
ReplicatedBuckets []string
|
||||
DeleteFromBuckets []string
|
||||
InflightReplicas []string
|
||||
PendingSync bool
|
||||
IsDeleted bool
|
||||
LastSyncTime int64
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/ente-io/museum/ente/filedata"
|
||||
"github.com/ente-io/museum/pkg/repo"
|
||||
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
|
||||
"github.com/ente-io/museum/pkg/utils/time"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"strconv"
|
||||
@ -70,7 +71,7 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) {
|
||||
return
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.RemoveBucketFromDeletedBuckets(fileDataRow, bucketID)
|
||||
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.DeletionColumn)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
|
||||
return
|
||||
@ -86,7 +87,7 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) {
|
||||
return
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.RemoveBucketFromReplicatedBuckets(fileDataRow, bucketID)
|
||||
dbErr := c.Repo.RemoveBucket(fileDataRow, bucketID, fileDataRepo.ReplicationColumn)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
|
||||
return
|
||||
@ -100,7 +101,7 @@ func (c *Controller) deleteFileData(qItem repo.QueueItem) {
|
||||
return
|
||||
}
|
||||
}
|
||||
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow.FileID, fileDataRow.Type, fileDataRow.LatestBucket)
|
||||
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow)
|
||||
if dbErr != nil {
|
||||
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
|
||||
return
|
||||
|
@ -3,6 +3,7 @@ package filedata
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"github.com/ente-io/museum/ente"
|
||||
"github.com/ente-io/museum/ente/filedata"
|
||||
"github.com/ente-io/stacktrace"
|
||||
@ -14,6 +15,12 @@ type Repository struct {
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
const (
|
||||
ReplicationColumn = "replicated_buckets"
|
||||
DeletionColumn = "delete_from_buckets"
|
||||
InflightRepColumn = "inflight_rep_buckets"
|
||||
)
|
||||
|
||||
func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) error {
|
||||
query := `
|
||||
INSERT INTO file_data
|
||||
@ -26,7 +33,7 @@ func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) erro
|
||||
delete_from_buckets = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_append(
|
||||
array_cat(file_data.replicated_buckets, file_data.delete_from_buckets),
|
||||
array_cat(array_cat(file_data.replicated_buckets, file_data.delete_from_buckets), file_data.inflight_rep_buckets),
|
||||
CASE WHEN file_data.latest_bucket != EXCLUDED.latest_bucket THEN file_data.latest_bucket END
|
||||
)
|
||||
) AS elem
|
||||
@ -45,7 +52,7 @@ func (r *Repository) InsertOrUpdate(ctx context.Context, data filedata.Row) erro
|
||||
}
|
||||
|
||||
func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fileIDs []int64) ([]filedata.Row, error) {
|
||||
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at
|
||||
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, inflight_rep_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at
|
||||
FROM file_data
|
||||
WHERE data_type = $1 AND file_id = ANY($2)`, string(oType), pq.Array(fileIDs))
|
||||
if err != nil {
|
||||
@ -55,7 +62,7 @@ func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fi
|
||||
}
|
||||
|
||||
func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) {
|
||||
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at
|
||||
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets,inflight_rep_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at
|
||||
FROM file_data
|
||||
WHERE file_id = $1`, fileIDs)
|
||||
if err != nil {
|
||||
@ -64,65 +71,97 @@ func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata
|
||||
return convertRowsToFilesData(rows)
|
||||
}
|
||||
|
||||
func (r *Repository) RemoveBucketFromDeletedBuckets(row filedata.Row, bucketID string) error {
|
||||
query := `
|
||||
UPDATE file_data
|
||||
SET delete_from_buckets = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_remove(
|
||||
file_data.delete_from_buckets,
|
||||
$1
|
||||
)
|
||||
) AS elem
|
||||
WHERE elem IS NOT NULL
|
||||
)
|
||||
WHERE file_id = $2 AND data_type = $3 and is_deleted = true`
|
||||
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type))
|
||||
func (r *Repository) AddBucket(row filedata.Row, bucketID string, columnName string) error {
|
||||
query := fmt.Sprintf(`
|
||||
UPDATE file_data
|
||||
SET %s = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_append(file_data.%s, $1)
|
||||
) AS elem
|
||||
)
|
||||
WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName)
|
||||
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID)
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "failed to remove bucket from deleted buckets")
|
||||
return stacktrace.Propagate(err, "failed to add bucket to "+columnName)
|
||||
}
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "")
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return stacktrace.NewError("bucket not removed from deleted buckets")
|
||||
return stacktrace.NewError("bucket not added to " + columnName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repository) RemoveBucketFromReplicatedBuckets(row filedata.Row, bucketID string) error {
|
||||
query := `
|
||||
UPDATE file_data
|
||||
SET replicated_buckets = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_remove(
|
||||
file_data.replicated_buckets,
|
||||
$1
|
||||
)
|
||||
) AS elem
|
||||
WHERE elem IS NOT NULL
|
||||
)
|
||||
WHERE file_id = $2 AND data_type = $3`
|
||||
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type))
|
||||
func (r *Repository) RemoveBucket(row filedata.Row, bucketID string, columnName string) error {
|
||||
query := fmt.Sprintf(`
|
||||
UPDATE file_data
|
||||
SET %s = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_remove(
|
||||
file_data.%s,
|
||||
$1
|
||||
)
|
||||
) AS elem
|
||||
WHERE elem IS NOT NULL
|
||||
)
|
||||
WHERE file_id = $2 AND data_type = $3 and user_id = $4`, columnName, columnName)
|
||||
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID)
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "failed to remove bucket from replicated buckets")
|
||||
return stacktrace.Propagate(err, "failed to remove bucket from "+columnName)
|
||||
}
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "")
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return stacktrace.NewError("bucket not removed from deleted buckets")
|
||||
return stacktrace.NewError("bucket not removed from " + columnName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repository) DeleteFileData(ctx context.Context, fileID int64, oType ente.ObjectType, latestBucketID string) error {
|
||||
func (r *Repository) MoveBetweenBuckets(row filedata.Row, bucketID string, sourceColumn string, destColumn string) error {
|
||||
query := fmt.Sprintf(`
|
||||
UPDATE file_data
|
||||
SET %s = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_append(
|
||||
file_data.%s,
|
||||
$1
|
||||
)
|
||||
) AS elem
|
||||
WHERE elem IS NOT NULL
|
||||
),
|
||||
%s = array(
|
||||
SELECT DISTINCT elem FROM unnest(
|
||||
array_remove(
|
||||
file_data.%s,
|
||||
$1
|
||||
)
|
||||
) AS elem
|
||||
WHERE elem IS NOT NULL
|
||||
)
|
||||
WHERE file_id = $2 AND data_type = $3 and user_id = $4`, destColumn, destColumn, sourceColumn, sourceColumn)
|
||||
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type), row.UserID)
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "failed to move bucket from "+sourceColumn+" to "+destColumn)
|
||||
}
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "")
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return stacktrace.NewError("bucket not moved from " + sourceColumn + " to " + destColumn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Repository) DeleteFileData(ctx context.Context, row filedata.Row) error {
|
||||
query := `
|
||||
DELETE FROM file_data
|
||||
WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[]`
|
||||
res, err := r.DB.ExecContext(ctx, query, fileID, string(oType), latestBucketID)
|
||||
WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND user_id = $4 AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[]`
|
||||
res, err := r.DB.ExecContext(ctx, query, row.FileID, string(row.Type), row.LatestBucket, row.UserID)
|
||||
if err != nil {
|
||||
return stacktrace.Propagate(err, "")
|
||||
}
|
||||
@ -141,12 +180,11 @@ func convertRowsToFilesData(rows *sql.Rows) ([]filedata.Row, error) {
|
||||
var filesData []filedata.Row
|
||||
for rows.Next() {
|
||||
var fileData filedata.Row
|
||||
err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), &fileData.PendingSync, &fileData.IsDeleted, &fileData.LastSyncTime, &fileData.CreatedAt, &fileData.UpdatedAt)
|
||||
err := rows.Scan(&fileData.FileID, &fileData.UserID, &fileData.Type, &fileData.Size, &fileData.LatestBucket, pq.Array(&fileData.ReplicatedBuckets), pq.Array(&fileData.DeleteFromBuckets), pq.Array(&fileData.InflightReplicas), &fileData.PendingSync, &fileData.IsDeleted, &fileData.LastSyncTime, &fileData.CreatedAt, &fileData.UpdatedAt)
|
||||
if err != nil {
|
||||
return nil, stacktrace.Propagate(err, "")
|
||||
}
|
||||
filesData = append(filesData, fileData)
|
||||
}
|
||||
return filesData, nil
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user