diff --git a/server/cmd/museum/main.go b/server/cmd/museum/main.go index c56045e4a4..602342c584 100644 --- a/server/cmd/museum/main.go +++ b/server/cmd/museum/main.go @@ -6,6 +6,7 @@ import ( b64 "encoding/base64" "fmt" "github.com/ente-io/museum/pkg/controller/file_copy" + "github.com/ente-io/museum/pkg/controller/filedata" "net/http" "os" "os/signal" @@ -49,6 +50,7 @@ import ( castRepo "github.com/ente-io/museum/pkg/repo/cast" "github.com/ente-io/museum/pkg/repo/datacleanup" "github.com/ente-io/museum/pkg/repo/embedding" + fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata" "github.com/ente-io/museum/pkg/repo/kex" "github.com/ente-io/museum/pkg/repo/passkey" "github.com/ente-io/museum/pkg/repo/remotestore" @@ -163,6 +165,7 @@ func main() { fileRepo := &repo.FileRepository{DB: db, S3Config: s3Config, QueueRepo: queueRepo, ObjectRepo: objectRepo, ObjectCleanupRepo: objectCleanupRepo, ObjectCopiesRepo: objectCopiesRepo, UsageRepo: usageRepo} + fileDataRepo := &fileDataRepo.Repository{DB: db} familyRepo := &repo.FamilyRepository{DB: db} trashRepo := &repo.TrashRepository{DB: db, ObjectRepo: objectRepo, FileRepo: fileRepo, QueueRepo: queueRepo} publicCollectionRepo := repo.NewPublicCollectionRepository(db, viper.GetString("apps.public-albums")) @@ -239,6 +242,9 @@ func main() { FileRepo: fileRepo, } + accessCtrl := access.NewAccessController(collectionRepo, fileRepo) + fileDataCtrl := filedata.New(fileDataRepo, accessCtrl, objectCleanupController, s3Config, queueRepo, taskLockingRepo, fileRepo, collectionRepo, hostName) + fileController := &controller.FileController{ FileRepo: fileRepo, ObjectRepo: objectRepo, @@ -288,8 +294,6 @@ func main() { JwtSecret: jwtSecretBytes, } - accessCtrl := access.NewAccessController(collectionRepo, fileRepo) - collectionController := &controller.CollectionController{ CollectionRepo: collectionRepo, AccessCtrl: accessCtrl, @@ -402,6 +406,7 @@ func main() { fileHandler := &api.FileHandler{ Controller: fileController, FileCopyCtrl: fileCopyCtrl, + FileDataCtrl: fileDataCtrl, } privateAPI.GET("/files/upload-urls", fileHandler.GetUploadURLs) privateAPI.GET("/files/multipart-upload-urls", fileHandler.GetMultipartUploadURLs) @@ -707,7 +712,7 @@ func main() { setupAndStartCrons( userAuthRepo, publicCollectionRepo, twoFactorRepo, passkeysRepo, fileController, taskLockingRepo, emailNotificationCtrl, trashController, pushController, objectController, dataCleanupController, storageBonusCtrl, - embeddingController, healthCheckHandler, kexCtrl, castDb) + embeddingController, fileDataCtrl, healthCheckHandler, kexCtrl, castDb) // Create a new collector, the name will be used as a label on the metrics collector := sqlstats.NewStatsCollector("prod_db", db) @@ -836,6 +841,7 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR dataCleanupCtrl *dataCleanupCtrl.DeleteUserCleanupController, storageBonusCtrl *storagebonus.Controller, embeddingCtrl *embeddingCtrl.Controller, + fileDataCtrl *filedata.Controller, healthCheckHandler *api.HealthCheckHandler, kexCtrl *kexCtrl.Controller, castDb castRepo.Repository) { @@ -879,8 +885,10 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR schedule(c, "@every 2m", func() { fileController.CleanupDeletedFiles() }) + fileDataCtrl.CleanUpDeletedFileData() schedule(c, "@every 101s", func() { embeddingCtrl.CleanupDeletedEmbeddings() + fileDataCtrl.CleanUpDeletedFileData() }) schedule(c, "@every 10m", func() { diff --git a/server/ente/filedata/path.go b/server/ente/filedata/path.go index c656fce164..5dd4f616ef 100644 --- a/server/ente/filedata/path.go +++ b/server/ente/filedata/path.go @@ -5,13 +5,13 @@ import ( "github.com/ente-io/museum/ente" ) -// basePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted, +// BasePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted, // ensure that there's no file in the S3 bucket with this prefix. -func basePrefix(fileID int64, ownerID int64) string { +func BasePrefix(fileID int64, ownerID int64) string { return fmt.Sprintf("%d/file-data/%d/", ownerID, fileID) } -func allObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string { +func AllObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string { switch oType { case ente.PreviewVideo: return []string{previewVideoPath(fileID, ownerID), previewVideoPlaylist(fileID, ownerID)} @@ -26,7 +26,7 @@ func allObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string { } func previewVideoPath(fileID int64, ownerID int64) string { - return fmt.Sprintf("%s%s", basePrefix(fileID, ownerID), string(ente.PreviewVideo)) + return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewVideo)) } func previewVideoPlaylist(fileID int64, ownerID int64) string { @@ -34,9 +34,9 @@ func previewVideoPlaylist(fileID int64, ownerID int64) string { } func previewImagePath(fileID int64, ownerID int64) string { - return fmt.Sprintf("%s%s", basePrefix(fileID, ownerID), string(ente.PreviewImage)) + return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewImage)) } func derivedMetaPath(fileID int64, ownerID int64) string { - return fmt.Sprintf("%s%s", basePrefix(fileID, ownerID), string(ente.DerivedMeta)) + return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.DerivedMeta)) } diff --git a/server/migrations/89_derived_data_table.down.sql b/server/migrations/89_file_data_table.down.sql similarity index 100% rename from server/migrations/89_derived_data_table.down.sql rename to server/migrations/89_file_data_table.down.sql diff --git a/server/migrations/89_derived_data_table.up.sql b/server/migrations/89_file_data_table.up.sql similarity index 96% rename from server/migrations/89_derived_data_table.up.sql rename to server/migrations/89_file_data_table.up.sql index 53b0ae95f3..3012cea296 100644 --- a/server/migrations/89_derived_data_table.up.sql +++ b/server/migrations/89_file_data_table.up.sql @@ -1,3 +1,5 @@ +ALTER TYPE OBJECT_TYPE ADD VALUE 'derivedMeta'; +ALTER TYPE s3region ADD VALUE 'b5'; -- Create the derived table CREATE TABLE IF NOT EXISTS file_data ( diff --git a/server/pkg/api/file_data.go b/server/pkg/api/file_data.go index 81546b52ef..73ddeb6f27 100644 --- a/server/pkg/api/file_data.go +++ b/server/pkg/api/file_data.go @@ -18,6 +18,11 @@ func (f *FileHandler) PutFileData(ctx *gin.Context) { ctx.JSON(http.StatusBadRequest, err) return } + reqInt := &req + if reqInt.Version == nil { + version := 1 + reqInt.Version = &version + } err := f.FileDataCtrl.InsertOrUpdate(ctx, &req) if err != nil { handler.Error(ctx, err) diff --git a/server/pkg/controller/file.go b/server/pkg/controller/file.go index 23d9c6e8f9..c001089d4d 100644 --- a/server/pkg/controller/file.go +++ b/server/pkg/controller/file.go @@ -284,8 +284,6 @@ func (c *FileController) GetUploadURLs(ctx context.Context, userID int64, count return urls, nil } - - // GetFileURL verifies permissions and returns a presigned url to the requested file func (c *FileController) GetFileURL(ctx *gin.Context, userID int64, fileID int64) (string, error) { err := c.verifyFileAccess(userID, fileID) diff --git a/server/pkg/controller/filedata/delete.go b/server/pkg/controller/filedata/delete.go new file mode 100644 index 0000000000..e9ada5a0f8 --- /dev/null +++ b/server/pkg/controller/filedata/delete.go @@ -0,0 +1,119 @@ +package filedata + +import ( + "context" + "fmt" + "github.com/ente-io/museum/ente/filedata" + "github.com/ente-io/museum/pkg/repo" + "github.com/ente-io/museum/pkg/utils/time" + log "github.com/sirupsen/logrus" + "strconv" +) + +// CleanUpDeletedFileData clears associated file data from the object store +func (c *Controller) CleanUpDeletedFileData() { + log.Info("Cleaning up deleted file data") + if c.cleanupCronRunning { + log.Info("Skipping CleanUpDeletedFileData cron run as another instance is still running") + return + } + c.cleanupCronRunning = true + defer func() { + c.cleanupCronRunning = false + }() + items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteFileDataQueue, 200) + if err != nil { + log.WithError(err).Error("Failed to fetch items from queue") + return + } + for _, i := range items { + c.deleteFileData(i) + } +} + +func (c *Controller) deleteFileData(qItem repo.QueueItem) { + lockName := fmt.Sprintf("FileDataDelete:%s", qItem.Item) + lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName) + ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id) + if err != nil || !lockStatus { + ctxLogger.Warn("unable to acquire lock") + return + } + defer func() { + err = c.TaskLockingRepo.ReleaseLock(lockName) + if err != nil { + ctxLogger.Errorf("Error while releasing lock %s", err) + } + }() + ctxLogger.Debug("Deleting all file data") + fileID, _ := strconv.ParseInt(qItem.Item, 10, 64) + ownerID, err := c.FileRepo.GetOwnerID(fileID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to fetch ownerID") + return + } + rows, err := c.Repo.GetFileData(context.Background(), fileID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to fetch datacenters") + return + } + for i := range rows { + fileDataRow := rows[i] + objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type) + // Delete from delete/stale buckets + for j := range fileDataRow.DeleteFromBuckets { + bucketID := fileDataRow.DeleteFromBuckets[j] + for k := range objectKeys { + err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete object from datacenter") + return + } + } + dbErr := c.Repo.RemoveBucketFromDeletedBuckets(fileDataRow, bucketID) + if dbErr != nil { + ctxLogger.WithError(dbErr).Error("Failed to remove from db") + return + } + } + // Delete from replicated buckets + for j := range fileDataRow.ReplicatedBuckets { + bucketID := fileDataRow.ReplicatedBuckets[j] + for k := range objectKeys { + err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete object from datacenter") + return + } + } + dbErr := c.Repo.RemoveBucketFromReplicatedBuckets(fileDataRow, bucketID) + if dbErr != nil { + ctxLogger.WithError(dbErr).Error("Failed to remove from db") + return + } + } + // Delete from Latest bucket + for k := range objectKeys { + err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket) + if err != nil { + ctxLogger.WithError(err).Error("Failed to delete object from datacenter") + return + } + } + dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow.FileID, fileDataRow.Type, fileDataRow.LatestBucket) + if dbErr != nil { + ctxLogger.WithError(dbErr).Error("Failed to remove from db") + return + } + } + if err != nil { + ctxLogger.WithError(err).Error("Failed delete data") + return + } + err = c.QueueRepo.DeleteItem(repo.DeleteFileDataQueue, qItem.Item) + if err != nil { + ctxLogger.WithError(err).Error("Failed to remove item from the queue") + return + } + ctxLogger.Info("Successfully deleted all file data") +} diff --git a/server/pkg/controller/filedata/file_object.go b/server/pkg/controller/filedata/file_object.go deleted file mode 100644 index aa627fb82c..0000000000 --- a/server/pkg/controller/filedata/file_object.go +++ /dev/null @@ -1,5 +0,0 @@ -package filedata - -func (c *Controller) f() { - -} diff --git a/server/pkg/repo/filedata/repository.go b/server/pkg/repo/filedata/repository.go index 02d7ccb57b..8b56a76d33 100644 --- a/server/pkg/repo/filedata/repository.go +++ b/server/pkg/repo/filedata/repository.go @@ -52,6 +52,88 @@ func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fi return nil, stacktrace.Propagate(err, "") } return convertRowsToFilesData(rows) +} + +func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) { + rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at + FROM file_data + WHERE file_id = $1`, fileIDs) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + return convertRowsToFilesData(rows) +} + +func (r *Repository) RemoveBucketFromDeletedBuckets(row filedata.Row, bucketID string) error { + query := ` + UPDATE file_data + SET delete_from_buckets = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.delete_from_buckets, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3 and is_deleted = true` + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type)) + if err != nil { + return stacktrace.Propagate(err, "failed to remove bucket from deleted buckets") + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("bucket not removed from deleted buckets") + } + return nil +} + +func (r *Repository) RemoveBucketFromReplicatedBuckets(row filedata.Row, bucketID string) error { + query := ` + UPDATE file_data + SET replicated_buckets = array( + SELECT DISTINCT elem FROM unnest( + array_remove( + file_data.replicated_buckets, + $1 + ) + ) AS elem + WHERE elem IS NOT NULL + ) + WHERE file_id = $2 AND data_type = $3` + result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type)) + if err != nil { + return stacktrace.Propagate(err, "failed to remove bucket from replicated buckets") + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("bucket not removed from deleted buckets") + } + return nil +} + +func (r *Repository) DeleteFileData(ctx context.Context, fileID int64, oType ente.ObjectType, latestBucketID string) error { + query := ` +DELETE FROM file_data +WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[]` + res, err := r.DB.ExecContext(ctx, query, fileID, string(oType), latestBucketID) + if err != nil { + return stacktrace.Propagate(err, "") + } + rowsAffected, err := res.RowsAffected() + if err != nil { + return stacktrace.Propagate(err, "") + } + if rowsAffected == 0 { + return stacktrace.NewError("file data not deleted") + } + return nil } diff --git a/server/pkg/repo/object.go b/server/pkg/repo/object.go index 052278402d..fc02e2d25c 100644 --- a/server/pkg/repo/object.go +++ b/server/pkg/repo/object.go @@ -148,12 +148,21 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context for _, fileID := range fileIDs { embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10)) } + _, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs)) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } err = repo.QueueRepo.AddItems(ctx, tx, DeleteEmbeddingsQueue, embeddingsToBeDeleted) if err != nil { return nil, stacktrace.Propagate(err, "") } + err = repo.QueueRepo.AddItems(ctx, tx, DeleteFileDataQueue, embeddingsToBeDeleted) + if err != nil { + return nil, stacktrace.Propagate(err, "") + } + _, err = tx.ExecContext(ctx, `UPDATE object_keys SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs)) if err != nil { return nil, stacktrace.Propagate(err, "") diff --git a/server/pkg/repo/queue.go b/server/pkg/repo/queue.go index 49544dbc8c..e4800aea9c 100644 --- a/server/pkg/repo/queue.go +++ b/server/pkg/repo/queue.go @@ -23,6 +23,7 @@ var itemDeletionDelayInMinMap = map[string]int64{ DropFileEncMedataQueue: -1 * 24 * 60, // -ve value to ensure attributes are immediately removed DeleteObjectQueue: 45 * 24 * 60, // 45 days in minutes DeleteEmbeddingsQueue: -1 * 24 * 60, // -ve value to ensure embeddings are immediately removed + DeleteFileDataQueue: -1 * 24 * 60, // -ve value to ensure file-data is immediately removed TrashCollectionQueueV3: -1 * 24 * 60, // -ve value to ensure collections are immediately marked as trashed TrashEmptyQueue: -1 * 24 * 60, // -ve value to ensure empty trash request are processed in next cron run RemoveComplianceHoldQueue: -1 * 24 * 60, // -ve value to ensure compliance hold is removed in next cron run @@ -32,6 +33,7 @@ const ( DropFileEncMedataQueue string = "dropFileEncMetata" DeleteObjectQueue string = "deleteObject" DeleteEmbeddingsQueue string = "deleteEmbedding" + DeleteFileDataQueue string = "deleteFileData" OutdatedObjectsQueue string = "outdatedObject" // Deprecated: Keeping it till we clean up items from the queue DB. TrashCollectionQueue string = "trashCollection"