[server] Handle fileData cleanup on file Deletion

This commit is contained in:
Neeraj Gupta 2024-08-06 15:29:10 +05:30
parent 543aa6b9cf
commit ec91e75780
11 changed files with 236 additions and 16 deletions

View File

@ -6,6 +6,7 @@ import (
b64 "encoding/base64"
"fmt"
"github.com/ente-io/museum/pkg/controller/file_copy"
"github.com/ente-io/museum/pkg/controller/filedata"
"net/http"
"os"
"os/signal"
@ -49,6 +50,7 @@ import (
castRepo "github.com/ente-io/museum/pkg/repo/cast"
"github.com/ente-io/museum/pkg/repo/datacleanup"
"github.com/ente-io/museum/pkg/repo/embedding"
fileDataRepo "github.com/ente-io/museum/pkg/repo/filedata"
"github.com/ente-io/museum/pkg/repo/kex"
"github.com/ente-io/museum/pkg/repo/passkey"
"github.com/ente-io/museum/pkg/repo/remotestore"
@ -163,6 +165,7 @@ func main() {
fileRepo := &repo.FileRepository{DB: db, S3Config: s3Config, QueueRepo: queueRepo,
ObjectRepo: objectRepo, ObjectCleanupRepo: objectCleanupRepo,
ObjectCopiesRepo: objectCopiesRepo, UsageRepo: usageRepo}
fileDataRepo := &fileDataRepo.Repository{DB: db}
familyRepo := &repo.FamilyRepository{DB: db}
trashRepo := &repo.TrashRepository{DB: db, ObjectRepo: objectRepo, FileRepo: fileRepo, QueueRepo: queueRepo}
publicCollectionRepo := repo.NewPublicCollectionRepository(db, viper.GetString("apps.public-albums"))
@ -239,6 +242,9 @@ func main() {
FileRepo: fileRepo,
}
accessCtrl := access.NewAccessController(collectionRepo, fileRepo)
fileDataCtrl := filedata.New(fileDataRepo, accessCtrl, objectCleanupController, s3Config, queueRepo, taskLockingRepo, fileRepo, collectionRepo, hostName)
fileController := &controller.FileController{
FileRepo: fileRepo,
ObjectRepo: objectRepo,
@ -288,8 +294,6 @@ func main() {
JwtSecret: jwtSecretBytes,
}
accessCtrl := access.NewAccessController(collectionRepo, fileRepo)
collectionController := &controller.CollectionController{
CollectionRepo: collectionRepo,
AccessCtrl: accessCtrl,
@ -402,6 +406,7 @@ func main() {
fileHandler := &api.FileHandler{
Controller: fileController,
FileCopyCtrl: fileCopyCtrl,
FileDataCtrl: fileDataCtrl,
}
privateAPI.GET("/files/upload-urls", fileHandler.GetUploadURLs)
privateAPI.GET("/files/multipart-upload-urls", fileHandler.GetMultipartUploadURLs)
@ -707,7 +712,7 @@ func main() {
setupAndStartCrons(
userAuthRepo, publicCollectionRepo, twoFactorRepo, passkeysRepo, fileController, taskLockingRepo, emailNotificationCtrl,
trashController, pushController, objectController, dataCleanupController, storageBonusCtrl,
embeddingController, healthCheckHandler, kexCtrl, castDb)
embeddingController, fileDataCtrl, healthCheckHandler, kexCtrl, castDb)
// Create a new collector, the name will be used as a label on the metrics
collector := sqlstats.NewStatsCollector("prod_db", db)
@ -836,6 +841,7 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR
dataCleanupCtrl *dataCleanupCtrl.DeleteUserCleanupController,
storageBonusCtrl *storagebonus.Controller,
embeddingCtrl *embeddingCtrl.Controller,
fileDataCtrl *filedata.Controller,
healthCheckHandler *api.HealthCheckHandler,
kexCtrl *kexCtrl.Controller,
castDb castRepo.Repository) {
@ -879,8 +885,10 @@ func setupAndStartCrons(userAuthRepo *repo.UserAuthRepository, publicCollectionR
schedule(c, "@every 2m", func() {
fileController.CleanupDeletedFiles()
})
fileDataCtrl.CleanUpDeletedFileData()
schedule(c, "@every 101s", func() {
embeddingCtrl.CleanupDeletedEmbeddings()
fileDataCtrl.CleanUpDeletedFileData()
})
schedule(c, "@every 10m", func() {

View File

@ -5,13 +5,13 @@ import (
"github.com/ente-io/museum/ente"
)
// basePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted,
// BasePrefix returns the base prefix for all objects related to a file. To check if the file data is deleted,
// ensure that there's no file in the S3 bucket with this prefix.
func basePrefix(fileID int64, ownerID int64) string {
func BasePrefix(fileID int64, ownerID int64) string {
return fmt.Sprintf("%d/file-data/%d/", ownerID, fileID)
}
func allObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string {
func AllObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string {
switch oType {
case ente.PreviewVideo:
return []string{previewVideoPath(fileID, ownerID), previewVideoPlaylist(fileID, ownerID)}
@ -26,7 +26,7 @@ func allObjects(fileID int64, ownerID int64, oType ente.ObjectType) []string {
}
func previewVideoPath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", basePrefix(fileID, ownerID), string(ente.PreviewVideo))
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewVideo))
}
func previewVideoPlaylist(fileID int64, ownerID int64) string {
@ -34,9 +34,9 @@ func previewVideoPlaylist(fileID int64, ownerID int64) string {
}
func previewImagePath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", basePrefix(fileID, ownerID), string(ente.PreviewImage))
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.PreviewImage))
}
func derivedMetaPath(fileID int64, ownerID int64) string {
return fmt.Sprintf("%s%s", basePrefix(fileID, ownerID), string(ente.DerivedMeta))
return fmt.Sprintf("%s%s", BasePrefix(fileID, ownerID), string(ente.DerivedMeta))
}

View File

@ -1,3 +1,5 @@
ALTER TYPE OBJECT_TYPE ADD VALUE 'derivedMeta';
ALTER TYPE s3region ADD VALUE 'b5';
-- Create the derived table
CREATE TABLE IF NOT EXISTS file_data
(

View File

@ -18,6 +18,11 @@ func (f *FileHandler) PutFileData(ctx *gin.Context) {
ctx.JSON(http.StatusBadRequest, err)
return
}
reqInt := &req
if reqInt.Version == nil {
version := 1
reqInt.Version = &version
}
err := f.FileDataCtrl.InsertOrUpdate(ctx, &req)
if err != nil {
handler.Error(ctx, err)

View File

@ -284,8 +284,6 @@ func (c *FileController) GetUploadURLs(ctx context.Context, userID int64, count
return urls, nil
}
// GetFileURL verifies permissions and returns a presigned url to the requested file
func (c *FileController) GetFileURL(ctx *gin.Context, userID int64, fileID int64) (string, error) {
err := c.verifyFileAccess(userID, fileID)

View File

@ -0,0 +1,119 @@
package filedata
import (
"context"
"fmt"
"github.com/ente-io/museum/ente/filedata"
"github.com/ente-io/museum/pkg/repo"
"github.com/ente-io/museum/pkg/utils/time"
log "github.com/sirupsen/logrus"
"strconv"
)
// CleanUpDeletedFileData clears associated file data from the object store
func (c *Controller) CleanUpDeletedFileData() {
log.Info("Cleaning up deleted file data")
if c.cleanupCronRunning {
log.Info("Skipping CleanUpDeletedFileData cron run as another instance is still running")
return
}
c.cleanupCronRunning = true
defer func() {
c.cleanupCronRunning = false
}()
items, err := c.QueueRepo.GetItemsReadyForDeletion(repo.DeleteFileDataQueue, 200)
if err != nil {
log.WithError(err).Error("Failed to fetch items from queue")
return
}
for _, i := range items {
c.deleteFileData(i)
}
}
func (c *Controller) deleteFileData(qItem repo.QueueItem) {
lockName := fmt.Sprintf("FileDataDelete:%s", qItem.Item)
lockStatus, err := c.TaskLockingRepo.AcquireLock(lockName, time.MicrosecondsAfterHours(1), c.HostName)
ctxLogger := log.WithField("item", qItem.Item).WithField("queue_id", qItem.Id)
if err != nil || !lockStatus {
ctxLogger.Warn("unable to acquire lock")
return
}
defer func() {
err = c.TaskLockingRepo.ReleaseLock(lockName)
if err != nil {
ctxLogger.Errorf("Error while releasing lock %s", err)
}
}()
ctxLogger.Debug("Deleting all file data")
fileID, _ := strconv.ParseInt(qItem.Item, 10, 64)
ownerID, err := c.FileRepo.GetOwnerID(fileID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to fetch ownerID")
return
}
rows, err := c.Repo.GetFileData(context.Background(), fileID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to fetch datacenters")
return
}
for i := range rows {
fileDataRow := rows[i]
objectKeys := filedata.AllObjects(fileID, ownerID, fileDataRow.Type)
// Delete from delete/stale buckets
for j := range fileDataRow.DeleteFromBuckets {
bucketID := fileDataRow.DeleteFromBuckets[j]
for k := range objectKeys {
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return
}
}
dbErr := c.Repo.RemoveBucketFromDeletedBuckets(fileDataRow, bucketID)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return
}
}
// Delete from replicated buckets
for j := range fileDataRow.ReplicatedBuckets {
bucketID := fileDataRow.ReplicatedBuckets[j]
for k := range objectKeys {
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], bucketID)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return
}
}
dbErr := c.Repo.RemoveBucketFromReplicatedBuckets(fileDataRow, bucketID)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return
}
}
// Delete from Latest bucket
for k := range objectKeys {
err = c.ObjectCleanupController.DeleteObjectFromDataCenter(objectKeys[k], fileDataRow.LatestBucket)
if err != nil {
ctxLogger.WithError(err).Error("Failed to delete object from datacenter")
return
}
}
dbErr := c.Repo.DeleteFileData(context.Background(), fileDataRow.FileID, fileDataRow.Type, fileDataRow.LatestBucket)
if dbErr != nil {
ctxLogger.WithError(dbErr).Error("Failed to remove from db")
return
}
}
if err != nil {
ctxLogger.WithError(err).Error("Failed delete data")
return
}
err = c.QueueRepo.DeleteItem(repo.DeleteFileDataQueue, qItem.Item)
if err != nil {
ctxLogger.WithError(err).Error("Failed to remove item from the queue")
return
}
ctxLogger.Info("Successfully deleted all file data")
}

View File

@ -1,5 +0,0 @@
package filedata
func (c *Controller) f() {
}

View File

@ -52,6 +52,88 @@ func (r *Repository) GetFilesData(ctx context.Context, oType ente.ObjectType, fi
return nil, stacktrace.Propagate(err, "")
}
return convertRowsToFilesData(rows)
}
func (r *Repository) GetFileData(ctx context.Context, fileIDs int64) ([]filedata.Row, error) {
rows, err := r.DB.QueryContext(ctx, `SELECT file_id, user_id, data_type, size, latest_bucket, replicated_buckets, delete_from_buckets, pending_sync, is_deleted, last_sync_time, created_at, updated_at
FROM file_data
WHERE file_id = $1`, fileIDs)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
return convertRowsToFilesData(rows)
}
func (r *Repository) RemoveBucketFromDeletedBuckets(row filedata.Row, bucketID string) error {
query := `
UPDATE file_data
SET delete_from_buckets = array(
SELECT DISTINCT elem FROM unnest(
array_remove(
file_data.delete_from_buckets,
$1
)
) AS elem
WHERE elem IS NOT NULL
)
WHERE file_id = $2 AND data_type = $3 and is_deleted = true`
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type))
if err != nil {
return stacktrace.Propagate(err, "failed to remove bucket from deleted buckets")
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return stacktrace.Propagate(err, "")
}
if rowsAffected == 0 {
return stacktrace.NewError("bucket not removed from deleted buckets")
}
return nil
}
func (r *Repository) RemoveBucketFromReplicatedBuckets(row filedata.Row, bucketID string) error {
query := `
UPDATE file_data
SET replicated_buckets = array(
SELECT DISTINCT elem FROM unnest(
array_remove(
file_data.replicated_buckets,
$1
)
) AS elem
WHERE elem IS NOT NULL
)
WHERE file_id = $2 AND data_type = $3`
result, err := r.DB.Exec(query, bucketID, row.FileID, string(row.Type))
if err != nil {
return stacktrace.Propagate(err, "failed to remove bucket from replicated buckets")
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return stacktrace.Propagate(err, "")
}
if rowsAffected == 0 {
return stacktrace.NewError("bucket not removed from deleted buckets")
}
return nil
}
func (r *Repository) DeleteFileData(ctx context.Context, fileID int64, oType ente.ObjectType, latestBucketID string) error {
query := `
DELETE FROM file_data
WHERE file_id = $1 AND data_type = $2 AND latest_bucket = $3 AND replicated_buckets = ARRAY[]::s3region[] AND delete_from_buckets = ARRAY[]::s3region[]`
res, err := r.DB.ExecContext(ctx, query, fileID, string(oType), latestBucketID)
if err != nil {
return stacktrace.Propagate(err, "")
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return stacktrace.Propagate(err, "")
}
if rowsAffected == 0 {
return stacktrace.NewError("file data not deleted")
}
return nil
}

View File

@ -148,12 +148,21 @@ func (repo *ObjectRepository) MarkObjectsAsDeletedForFileIDs(ctx context.Context
for _, fileID := range fileIDs {
embeddingsToBeDeleted = append(embeddingsToBeDeleted, strconv.FormatInt(fileID, 10))
}
_, err = tx.ExecContext(ctx, `UPDATE file_data SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
err = repo.QueueRepo.AddItems(ctx, tx, DeleteEmbeddingsQueue, embeddingsToBeDeleted)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
err = repo.QueueRepo.AddItems(ctx, tx, DeleteFileDataQueue, embeddingsToBeDeleted)
if err != nil {
return nil, stacktrace.Propagate(err, "")
}
_, err = tx.ExecContext(ctx, `UPDATE object_keys SET is_deleted = TRUE WHERE file_id = ANY($1)`, pq.Array(fileIDs))
if err != nil {
return nil, stacktrace.Propagate(err, "")

View File

@ -23,6 +23,7 @@ var itemDeletionDelayInMinMap = map[string]int64{
DropFileEncMedataQueue: -1 * 24 * 60, // -ve value to ensure attributes are immediately removed
DeleteObjectQueue: 45 * 24 * 60, // 45 days in minutes
DeleteEmbeddingsQueue: -1 * 24 * 60, // -ve value to ensure embeddings are immediately removed
DeleteFileDataQueue: -1 * 24 * 60, // -ve value to ensure file-data is immediately removed
TrashCollectionQueueV3: -1 * 24 * 60, // -ve value to ensure collections are immediately marked as trashed
TrashEmptyQueue: -1 * 24 * 60, // -ve value to ensure empty trash request are processed in next cron run
RemoveComplianceHoldQueue: -1 * 24 * 60, // -ve value to ensure compliance hold is removed in next cron run
@ -32,6 +33,7 @@ const (
DropFileEncMedataQueue string = "dropFileEncMetata"
DeleteObjectQueue string = "deleteObject"
DeleteEmbeddingsQueue string = "deleteEmbedding"
DeleteFileDataQueue string = "deleteFileData"
OutdatedObjectsQueue string = "outdatedObject"
// Deprecated: Keeping it till we clean up items from the queue DB.
TrashCollectionQueue string = "trashCollection"