[mob][photos] Simplify MLIndexingIsolate

This commit is contained in:
laurenspriem 2024-09-02 18:12:11 +02:00
parent 947bd57b99
commit e1288bfd61
3 changed files with 95 additions and 201 deletions

View File

@ -4,11 +4,22 @@ import 'dart:typed_data' show Uint8List;
import "package:logging/logging.dart"; import "package:logging/logging.dart";
import "package:photos/models/ml/face/box.dart"; import "package:photos/models/ml/face/box.dart";
import "package:photos/services/machine_learning/ml_model.dart"; import "package:photos/services/machine_learning/ml_model.dart";
import "package:photos/services/machine_learning/ml_result.dart";
import "package:photos/services/machine_learning/semantic_search/clip/clip_text_encoder.dart"; import "package:photos/services/machine_learning/semantic_search/clip/clip_text_encoder.dart";
import "package:photos/services/machine_learning/semantic_search/clip/clip_text_tokenizer.dart"; import "package:photos/services/machine_learning/semantic_search/clip/clip_text_tokenizer.dart";
import "package:photos/utils/image_ml_util.dart"; import "package:photos/utils/image_ml_util.dart";
import "package:photos/utils/ml_util.dart";
enum IsolateOperation { enum IsolateOperation {
/// [MLIndexingIsolate]
analyzeImage,
/// [MLIndexingIsolate]
loadIndexingModels,
/// [MLIndexingIsolate]
releaseIndexingModels,
/// [MLComputer] /// [MLComputer]
generateFaceThumbnails, generateFaceThumbnails,
@ -33,6 +44,43 @@ Future<dynamic> isolateFunction(
Map<String, dynamic> args, Map<String, dynamic> args,
) async { ) async {
switch (function) { switch (function) {
/// Cases for MLIndexingIsolate start here
/// MLIndexingIsolate
case IsolateOperation.analyzeImage:
final MLResult result = await analyzeImageStatic(args);
return result.toJsonString();
/// MLIndexingIsolate
case IsolateOperation.loadIndexingModels:
final modelNames = args['modelNames'] as List<String>;
final modelPaths = args['modelPaths'] as List<String>;
final addresses = <int>[];
for (int i = 0; i < modelNames.length; i++) {
// TODO:lau check logging here
final int address = await MlModel.loadModel(
modelNames[i],
modelPaths[i],
);
addresses.add(address);
}
return List<int>.from(addresses, growable: false);
/// MLIndexingIsolate
case IsolateOperation.releaseIndexingModels:
// TODO:lau check logging here
final modelNames = args['modelNames'] as List<String>;
final modelAddresses = args['modelAddresses'] as List<int>;
for (int i = 0; i < modelNames.length; i++) {
await MlModel.releaseModel(
modelNames[i],
modelAddresses[i],
);
}
return true;
/// Cases for MLIndexingIsolate stop here
/// Cases for MLComputer start here /// Cases for MLComputer start here
/// MLComputer /// MLComputer

View File

@ -99,7 +99,7 @@ abstract class SuperIsolate {
return _functionLock.synchronized(() async { return _functionLock.synchronized(() async {
if (shouldAutomaticDispose) _resetInactivityTimer(); if (shouldAutomaticDispose) _resetInactivityTimer();
if (postFunctionlockStop()) { if (postFunctionlockStop(operation)) {
return null; return null;
} }
@ -135,7 +135,7 @@ abstract class SuperIsolate {
}); });
} }
bool postFunctionlockStop() => false; bool postFunctionlockStop(IsolateOperation operation) => false;
/// Resets a timer that kills the isolate after a certain amount of inactivity. /// Resets a timer that kills the isolate after a certain amount of inactivity.
/// ///
@ -149,7 +149,7 @@ abstract class SuperIsolate {
_resetInactivityTimer(); _resetInactivityTimer();
} else { } else {
logger.info( logger.info(
'Clustering Isolate has been inactive for ${_inactivityDuration.inSeconds} seconds with no tasks running. Killing isolate.', 'Isolate has been inactive for ${_inactivityDuration.inSeconds} seconds with no tasks running. Killing isolate.',
); );
_disposeIsolate(); _disposeIsolate();
} }

View File

@ -1,36 +1,43 @@
import "dart:async"; import "dart:async";
import "dart:isolate";
import "package:dart_ui_isolate/dart_ui_isolate.dart"; import "package:flutter/foundation.dart" show debugPrint;
import "package:flutter/foundation.dart" show debugPrint, kDebugMode;
import "package:logging/logging.dart"; import "package:logging/logging.dart";
import "package:photos/core/error-reporting/super_logging.dart"; import "package:photos/services/isolate_functions.dart";
import "package:photos/services/isolate_service.dart";
import 'package:photos/services/machine_learning/face_ml/face_detection/face_detection_service.dart'; import 'package:photos/services/machine_learning/face_ml/face_detection/face_detection_service.dart';
import 'package:photos/services/machine_learning/face_ml/face_embedding/face_embedding_service.dart'; import 'package:photos/services/machine_learning/face_ml/face_embedding/face_embedding_service.dart';
import "package:photos/services/machine_learning/ml_model.dart";
import "package:photos/services/machine_learning/ml_models_overview.dart"; import "package:photos/services/machine_learning/ml_models_overview.dart";
import 'package:photos/services/machine_learning/ml_result.dart'; import 'package:photos/services/machine_learning/ml_result.dart';
import "package:photos/services/machine_learning/semantic_search/clip/clip_image_encoder.dart"; import "package:photos/services/machine_learning/semantic_search/clip/clip_image_encoder.dart";
import "package:photos/utils/ml_util.dart"; import "package:photos/utils/ml_util.dart";
import "package:synchronized/synchronized.dart";
enum MLIndexingOperation { analyzeImage, loadModels, releaseModels } class MLIndexingIsolate extends SuperIsolate {
@override
Logger get logger => _logger;
final _logger = Logger("MLIndexingIsolate");
class MLIndexingIsolate { @override
static final _logger = Logger("MLIndexingIsolate"); bool get isDartUiIsolate => true;
Timer? _inactivityTimer; @override
final Duration _inactivityDuration = const Duration(seconds: 120); String get isolateName => "MLIndexingIsolate";
int _activeTasks = 0;
final _functionLock = Lock(); @override
final _initIsolateLock = Lock(); bool get shouldAutomaticDispose => true;
late DartUiIsolate _isolate; @override
late ReceivePort _receivePort = ReceivePort(); Future<void> onDispose() async {
late SendPort _mainSendPort; await _releaseModels();
}
bool _isIsolateSpawned = false; @override
bool postFunctionlockStop(IsolateOperation operation) {
if (operation == IsolateOperation.analyzeImage &&
shouldPauseIndexingAndClustering) {
return true;
}
return false;
}
bool shouldPauseIndexingAndClustering = false; bool shouldPauseIndexingAndClustering = false;
@ -39,152 +46,6 @@ class MLIndexingIsolate {
static final instance = MLIndexingIsolate._privateConstructor(); static final instance = MLIndexingIsolate._privateConstructor();
factory MLIndexingIsolate() => instance; factory MLIndexingIsolate() => instance;
Future<void> _initIsolate() async {
return _initIsolateLock.synchronized(() async {
if (_isIsolateSpawned) return;
_logger.info("initIsolate called");
_receivePort = ReceivePort();
try {
_isolate = await DartUiIsolate.spawn(
_isolateMain,
_receivePort.sendPort,
);
_mainSendPort = await _receivePort.first as SendPort;
_isIsolateSpawned = true;
_resetInactivityTimer();
_logger.info('initIsolate done');
} catch (e) {
_logger.severe('Could not spawn isolate', e);
_isIsolateSpawned = false;
}
});
}
/// The main execution function of the isolate.
@pragma('vm:entry-point')
static void _isolateMain(SendPort mainSendPort) async {
Logger.root.level = kDebugMode ? Level.ALL : Level.INFO;
// TODO:lau move to right isolate logging
Logger.root.onRecord.listen((LogRecord rec) {
debugPrint('[MLIsolate] ${rec.toPrettyString()}');
});
final receivePort = ReceivePort();
mainSendPort.send(receivePort.sendPort);
receivePort.listen((message) async {
final functionIndex = message[0] as int;
final function = MLIndexingOperation.values[functionIndex];
final args = message[1] as Map<String, dynamic>;
final sendPort = message[2] as SendPort;
try {
switch (function) {
case MLIndexingOperation.analyzeImage:
final MLResult result = await analyzeImageStatic(args);
sendPort.send(result.toJsonString());
break;
case MLIndexingOperation.loadModels:
final modelNames = args['modelNames'] as List<String>;
final modelPaths = args['modelPaths'] as List<String>;
final addresses = <int>[];
for (int i = 0; i < modelNames.length; i++) {
// TODO:lau check logging here
final int address = await MlModel.loadModel(
modelNames[i],
modelPaths[i],
);
addresses.add(address);
}
sendPort.send(List<int>.from(addresses, growable: false));
break;
case MLIndexingOperation.releaseModels:
// TODO:lau check logging here
final modelNames = args['modelNames'] as List<String>;
final modelAddresses = args['modelAddresses'] as List<int>;
for (int i = 0; i < modelNames.length; i++) {
await MlModel.releaseModel(
modelNames[i],
modelAddresses[i],
);
}
sendPort.send(true);
break;
}
} catch (e, s) {
_logger.severe("Error in FaceML isolate", e, s);
sendPort.send({'error': e.toString(), 'stackTrace': s.toString()});
}
});
}
/// The common method to run any operation in the isolate. It sends the [message] to [_isolateMain] and waits for the result.
Future<dynamic> _runInIsolate(
(MLIndexingOperation, Map<String, dynamic>) message,
) async {
await _initIsolate();
return _functionLock.synchronized(() async {
_resetInactivityTimer();
if (message.$1 == MLIndexingOperation.analyzeImage &&
shouldPauseIndexingAndClustering) {
return null;
}
final completer = Completer<dynamic>();
final answerPort = ReceivePort();
_activeTasks++;
_mainSendPort.send([message.$1.index, message.$2, answerPort.sendPort]);
answerPort.listen((receivedMessage) {
if (receivedMessage is Map && receivedMessage.containsKey('error')) {
// Handle the error
final errorMessage = receivedMessage['error'];
final errorStackTrace = receivedMessage['stackTrace'];
final exception = Exception(errorMessage);
final stackTrace = StackTrace.fromString(errorStackTrace);
completer.completeError(exception, stackTrace);
} else {
completer.complete(receivedMessage);
}
});
_activeTasks--;
return completer.future;
});
}
/// Resets a timer that kills the isolate after a certain amount of inactivity.
///
/// Should be called after initialization (e.g. inside `init()`) and after every call to isolate (e.g. inside `_runInIsolate()`)
void _resetInactivityTimer() {
_inactivityTimer?.cancel();
_inactivityTimer = Timer(_inactivityDuration, () {
if (_activeTasks > 0) {
_logger.info('Tasks are still running. Delaying isolate disposal.');
// Optionally, reschedule the timer to check again later.
_resetInactivityTimer();
} else {
_logger.info(
'Clustering Isolate has been inactive for ${_inactivityDuration.inSeconds} seconds with no tasks running. Killing isolate.',
);
_dispose();
}
});
}
void _dispose() async {
if (!_isIsolateSpawned) return;
_logger.info('Disposing isolate and models');
await _releaseModels();
_isIsolateSpawned = false;
_isolate.kill();
_receivePort.close();
_inactivityTimer?.cancel();
}
/// Analyzes the given image data by running the full pipeline for faces, using [_analyzeImageSync] in the isolate. /// Analyzes the given image data by running the full pipeline for faces, using [_analyzeImageSync] in the isolate.
Future<MLResult?> analyzeImage( Future<MLResult?> analyzeImage(
FileMLInstruction instruction, FileMLInstruction instruction,
@ -194,22 +55,16 @@ class MLIndexingIsolate {
late MLResult result; late MLResult result;
try { try {
final resultJsonString = await _runInIsolate( final resultJsonString =
( await runInIsolate(IsolateOperation.analyzeImage, {
MLIndexingOperation.analyzeImage, "enteFileID": instruction.file.uploadedFileID ?? -1,
{ "filePath": filePath,
"enteFileID": instruction.file.uploadedFileID ?? -1, "runFaces": instruction.shouldRunFaces,
"filePath": filePath, "runClip": instruction.shouldRunClip,
"runFaces": instruction.shouldRunFaces, "faceDetectionAddress": FaceDetectionService.instance.sessionAddress,
"runClip": instruction.shouldRunClip, "faceEmbeddingAddress": FaceEmbeddingService.instance.sessionAddress,
"faceDetectionAddress": "clipImageAddress": ClipImageEncoder.instance.sessionAddress,
FaceDetectionService.instance.sessionAddress, }) as String?;
"faceEmbeddingAddress":
FaceEmbeddingService.instance.sessionAddress,
"clipImageAddress": ClipImageEncoder.instance.sessionAddress,
}
),
) as String?;
if (resultJsonString == null) { if (resultJsonString == null) {
if (!shouldPauseIndexingAndClustering) { if (!shouldPauseIndexingAndClustering) {
_logger.severe('Analyzing image in isolate is giving back null'); _logger.severe('Analyzing image in isolate is giving back null');
@ -264,15 +119,11 @@ class MLIndexingIsolate {
} }
try { try {
final addresses = await _runInIsolate( final addresses =
( await runInIsolate(IsolateOperation.loadIndexingModels, {
MLIndexingOperation.loadModels, "modelNames": modelNames,
{ "modelPaths": modelPaths,
"modelNames": modelNames, }) as List<int>;
"modelPaths": modelPaths,
}
),
) as List<int>;
for (int i = 0; i < models.length; i++) { for (int i = 0; i < models.length; i++) {
final model = models[i].model; final model = models[i].model;
final address = addresses[i]; final address = addresses[i];
@ -299,15 +150,10 @@ class MLIndexingIsolate {
} }
if (modelNames.isEmpty) return; if (modelNames.isEmpty) return;
try { try {
await _runInIsolate( await runInIsolate(IsolateOperation.releaseIndexingModels, {
( "modelNames": modelNames,
MLIndexingOperation.releaseModels, "modelAddresses": modelAddresses,
{ });
"modelNames": modelNames,
"modelAddresses": modelAddresses,
}
),
);
for (final model in models) { for (final model in models) {
model.model.releaseSessionAddress(); model.model.releaseSessionAddress();
} }