[mob] Modify instruction based on remote response

This commit is contained in:
Neeraj Gupta 2024-07-24 12:32:56 +05:30
parent a996ec3ac7
commit ef372ebfa4
3 changed files with 76 additions and 78 deletions

View File

@ -88,71 +88,69 @@ class FaceRecognitionService {
_isSyncing = false;
}
Future<void> _syncFaceEmbeddings({int retryFetchCount = 10}) async {
Future<List<FileMLInstruction>> _syncFaceEmbeddings() async {
final List<FileMLInstruction> filesToIndex = await getFilesForMlIndexing();
final Map<int, FileMLInstruction> pendingIndex = {};
final List<List<FileMLInstruction>> chunks =
filesToIndex.chunks(_embeddingFetchLimit); // Chunks of 200
int filesIndexedForFaces = 0;
int filesIndexedForClip = 0;
for (final chunk in chunks) {
try {
final fileIds = chunk
.map((instruction) => instruction.enteFile.uploadedFileID!)
.toSet();
_logger.info('starting remote fetch for ${fileIds.length} files');
final res =
await RemoteFileMLService.instance.getFileEmbeddings(fileIds);
_logger.info('fetched ${res.mlData.length} embeddings');
final List<Face> faces = [];
final List<ClipEmbedding> clipEmbeddings = [];
for (RemoteFileML fileMl in res.mlData.values) {
final facesFromRemoteEmbedding = _getFacesFromRemoteEmbedding(fileMl);
//Note: Always do null check, empty value means no face was found.
if (facesFromRemoteEmbedding != null) {
faces.addAll(facesFromRemoteEmbedding);
filesIndexedForFaces++;
}
if (fileMl.clipEmbedding != null &&
fileMl.clipEmbedding!.version >= clipMlVersion) {
clipEmbeddings.add(
ClipEmbedding(
fileID: fileMl.fileID,
embedding: fileMl.clipEmbedding!.embedding,
version: fileMl.clipEmbedding!.version,
),
);
filesIndexedForClip++;
}
final Set<int> ids = {};
for (final instruction in chunk) {
ids.add(instruction.file.uploadedFileID!);
pendingIndex[instruction.file.uploadedFileID!] = instruction;
}
_logger.info('starting remote fetch for ${ids.length} files');
final res = await RemoteFileMLService.instance.getFileEmbeddings(ids);
_logger.info('fetched ${res.mlData.length} embeddings');
final List<Face> faces = [];
final List<ClipEmbedding> clipEmbeddings = [];
for (RemoteFileML fileMl in res.mlData.values) {
final existingInstruction = pendingIndex[fileMl.fileID]!;
final facesFromRemoteEmbedding = _getFacesFromRemoteEmbedding(fileMl);
//Note: Always do null check, empty value means no face was found.
if (facesFromRemoteEmbedding != null) {
faces.addAll(facesFromRemoteEmbedding);
filesIndexedForFaces++;
existingInstruction.shouldRunFaces = false;
}
if (res.noEmbeddingFileIDs.isNotEmpty) {
_logger.info(
'No embeddings found for ${res.noEmbeddingFileIDs.length} files',
if (fileMl.clipEmbedding != null &&
fileMl.clipEmbedding!.version >= clipMlVersion) {
clipEmbeddings.add(
ClipEmbedding(
fileID: fileMl.fileID,
embedding: fileMl.clipEmbedding!.embedding,
version: fileMl.clipEmbedding!.version,
),
);
for (final fileID in res.noEmbeddingFileIDs) {
faces.add(Face.empty(fileID, error: false));
}
existingInstruction.shouldRunClip = false;
filesIndexedForClip++;
}
await FaceMLDataDB.instance.bulkInsertFaces(faces);
await EmbeddingsDB.instance.putMany(clipEmbeddings);
_logger.info(
'Embedding store files for face $filesIndexedForFaces, and clip $filesIndexedForClip',
);
} catch (e, s) {
_logger.severe("err while getting files embeddings", e, s);
if (retryFetchCount < 1000) {
Future.delayed(Duration(seconds: retryFetchCount), () {
unawaited(
_syncFaceEmbeddings(retryFetchCount: retryFetchCount * 2),
);
});
return;
if (!existingInstruction.pendingML) {
pendingIndex.remove(fileMl.fileID);
} else {
_logger.severe("embeddingFetch failed with retries", e, s);
rethrow;
existingInstruction.existingRemoteFileML = fileMl;
pendingIndex[fileMl.fileID] = existingInstruction;
}
}
if (res.noEmbeddingFileIDs.isNotEmpty) {
_logger.info(
'No embeddings found for ${res.noEmbeddingFileIDs.length} files',
);
for (final fileID in res.noEmbeddingFileIDs) {
faces.add(Face.empty(fileID, error: false));
}
}
await FaceMLDataDB.instance.bulkInsertFaces(faces);
await EmbeddingsDB.instance.putMany(clipEmbeddings);
_logger.info(
'Embedding store files for face $filesIndexedForFaces, and clip $filesIndexedForClip',
);
}
final List<FileMLInstruction> instructions = pendingIndex.values.toList();
return instructions;
}
// Returns a list of faces from the given remote fileML. null if the version is less than the current version

View File

@ -393,7 +393,7 @@ class MLService {
Future<bool> processImage(FileMLInstruction instruction) async {
// TODO: clean this function up
_logger.info(
"`processImage` start processing image with uploadedFileID: ${instruction.enteFile.uploadedFileID}",
"`processImage` start processing image with uploadedFileID: ${instruction.file.uploadedFileID}",
);
bool actuallyRanML = false;
@ -404,7 +404,7 @@ class MLService {
if (result == null) {
if (!_shouldPauseIndexingAndClustering) {
_logger.severe(
"Failed to analyze image with uploadedFileID: ${instruction.enteFile.uploadedFileID}",
"Failed to analyze image with uploadedFileID: ${instruction.file.uploadedFileID}",
);
}
return actuallyRanML;
@ -414,7 +414,7 @@ class MLService {
final List<Face> faces = [];
if (result.foundNoFaces) {
debugPrint(
'No faces detected for file with name:${instruction.enteFile.displayName}',
'No faces detected for file with name:${instruction.file.displayName}',
);
faces.add(
Face.empty(result.fileId, error: result.errorOccured),
@ -425,9 +425,9 @@ class MLService {
result.decodedImageSize.height == -1) {
_logger.severe(
"decodedImageSize is not stored correctly for image with "
"ID: ${instruction.enteFile.uploadedFileID}");
"ID: ${instruction.file.uploadedFileID}");
_logger.info(
"Using aligned image size for image with ID: ${instruction.enteFile.uploadedFileID}. This size is ${result.decodedImageSize.width}x${result.decodedImageSize.height} compared to size of ${instruction.enteFile.width}x${instruction.enteFile.height} in the metadata",
"Using aligned image size for image with ID: ${instruction.file.uploadedFileID}. This size is ${result.decodedImageSize.width}x${result.decodedImageSize.height} compared to size of ${instruction.file.width}x${instruction.file.height} in the metadata",
);
}
for (int i = 0; i < result.faces!.length; ++i) {
@ -467,10 +467,10 @@ class MLService {
_logger.info("inserting ${faces.length} faces for ${result.fileId}");
if (!result.errorOccured) {
await RemoteFileMLService.instance.putFileEmbedding(
instruction.enteFile,
instruction.file,
instruction.existingRemoteFileML ??
RemoteFileML.empty(
instruction.enteFile.uploadedFileID!,
instruction.file.uploadedFileID!,
),
faceEmbedding: result.facesRan
? RemoteFaceEmbedding(
@ -501,25 +501,25 @@ class MLService {
actuallyRanML = true;
await SemanticSearchService.storeClipImageResult(
result.clip!,
instruction.enteFile,
instruction.file,
);
}
} on ThumbnailRetrievalException catch (e, s) {
_logger.severe(
'ThumbnailRetrievalException while processing image with ID ${instruction.enteFile.uploadedFileID}, storing empty face so indexing does not get stuck',
'ThumbnailRetrievalException while processing image with ID ${instruction.file.uploadedFileID}, storing empty face so indexing does not get stuck',
e,
s,
);
await FaceMLDataDB.instance.bulkInsertFaces(
[Face.empty(instruction.enteFile.uploadedFileID!, error: true)],
[Face.empty(instruction.file.uploadedFileID!, error: true)],
);
await SemanticSearchService.storeEmptyClipImageResult(
instruction.enteFile,
instruction.file,
);
return true;
} catch (e, s) {
_logger.severe(
"Failed to analyze using FaceML for image with ID: ${instruction.enteFile.uploadedFileID}. Not storing any faces, which means it will be automatically retried later.",
"Failed to analyze using FaceML for image with ID: ${instruction.file.uploadedFileID}. Not storing any faces, which means it will be automatically retried later.",
e,
s,
);
@ -756,7 +756,7 @@ class MLService {
Future<MLResult?> _analyzeImageInSingleIsolate(
FileMLInstruction instruction,
) async {
final String filePath = await getImagePathForML(instruction.enteFile);
final String filePath = await getImagePathForML(instruction.file);
final Stopwatch stopwatch = Stopwatch()..start();
late MLResult result;
@ -766,7 +766,7 @@ class MLService {
(
FaceMlOperation.analyzeImage,
{
"enteFileID": instruction.enteFile.uploadedFileID ?? -1,
"enteFileID": instruction.file.uploadedFileID ?? -1,
"filePath": filePath,
"runFaces": instruction.shouldRunFaces,
"runClip": instruction.shouldRunClip,
@ -787,21 +787,21 @@ class MLService {
result = MLResult.fromJsonString(resultJsonString);
} catch (e, s) {
_logger.severe(
"Could not analyze image with ID ${instruction.enteFile.uploadedFileID} \n",
"Could not analyze image with ID ${instruction.file.uploadedFileID} \n",
e,
s,
);
debugPrint(
"This image with ID ${instruction.enteFile.uploadedFileID} has name ${instruction.enteFile.displayName}.",
"This image with ID ${instruction.file.uploadedFileID} has name ${instruction.file.displayName}.",
);
final resultBuilder =
MLResult.fromEnteFileID(instruction.enteFile.uploadedFileID!)
MLResult.fromEnteFileID(instruction.file.uploadedFileID!)
..errorOccurred();
return resultBuilder;
}
stopwatch.stop();
_logger.info(
"Finished Analyze image with uploadedFileID ${instruction.enteFile.uploadedFileID}, in "
"Finished Analyze image with uploadedFileID ${instruction.file.uploadedFileID}, in "
"${stopwatch.elapsedMilliseconds} ms (including time waiting for inference engine availability)",
);

View File

@ -28,18 +28,18 @@ class IndexStatus {
}
class FileMLInstruction {
final EnteFile enteFile;
final bool shouldRunFaces;
final bool shouldRunClip;
final EnteFile file;
bool shouldRunFaces;
bool shouldRunClip;
RemoteFileML? existingRemoteFileML;
FileMLInstruction({
required this.enteFile,
required this.file,
required this.shouldRunFaces,
required this.shouldRunClip,
});
// Returns true if the file should be indexed for either faces or clip
bool get pendingML => shouldRunFaces || shouldRunClip;
}
Future<IndexStatus> getIndexStatus() async {
@ -89,7 +89,7 @@ Future<List<FileMLInstruction>> getFilesForMlIndexing() async {
continue;
}
final instruction = FileMLInstruction(
enteFile: enteFile,
file: enteFile,
shouldRunFaces: shouldRunFaces,
shouldRunClip: shouldRunClip,
);
@ -111,7 +111,7 @@ Future<List<FileMLInstruction>> getFilesForMlIndexing() async {
continue;
}
final instruction = FileMLInstruction(
enteFile: enteFile,
file: enteFile,
shouldRunFaces: shouldRunFaces,
shouldRunClip: shouldRunClip,
);