[mob][photos] Refactor of flags for faceMlService

This commit is contained in:
laurenspriem 2024-05-22 14:41:44 +05:30
parent 78afae4013
commit 678efd1e8b
3 changed files with 123 additions and 97 deletions

View File

@ -74,7 +74,7 @@ class FaceMlService {
late ReceivePort _receivePort = ReceivePort(); late ReceivePort _receivePort = ReceivePort();
late SendPort _mainSendPort; late SendPort _mainSendPort;
bool isIsolateSpawned = false; bool _isIsolateSpawned = false;
// singleton pattern // singleton pattern
FaceMlService._privateConstructor(); FaceMlService._privateConstructor();
@ -89,12 +89,14 @@ class FaceMlService {
final _computer = Computer.shared(); final _computer = Computer.shared();
bool isInitialized = false; bool isInitialized = false;
late String client; late final String client;
bool canRunMLController = false; bool debugIndexingDisabled = false;
bool isImageIndexRunning = false; bool _mlControllerStatus = false;
bool isClusteringRunning = false; bool _isIndexingOrClusteringRunning = false;
bool shouldSyncPeople = false; bool _shouldPauseIndexingAndClustering = false;
bool _shouldSyncPeople = false;
bool _isSyncing = false;
final int _fileDownloadLimit = 10; final int _fileDownloadLimit = 10;
final int _embeddingFetchLimit = 200; final int _embeddingFetchLimit = 200;
@ -133,16 +135,16 @@ class FaceMlService {
_logger.info("client: $client"); _logger.info("client: $client");
isInitialized = true; isInitialized = true;
canRunMLController = !Platform.isAndroid || kDebugMode; _mlControllerStatus = !Platform.isAndroid;
/// hooking FaceML into [MachineLearningController] /// hooking FaceML into [MachineLearningController]
if (Platform.isAndroid && !kDebugMode) { if (Platform.isAndroid) {
Bus.instance.on<MachineLearningControlEvent>().listen((event) { Bus.instance.on<MachineLearningControlEvent>().listen((event) {
if (LocalSettings.instance.isFaceIndexingEnabled == false) { if (LocalSettings.instance.isFaceIndexingEnabled == false) {
return; return;
} }
canRunMLController = event.shouldRun; _mlControllerStatus = event.shouldRun;
if (canRunMLController) { if (_mlControllerStatus) {
_logger.info( _logger.info(
"MLController allowed running ML, faces indexing starting", "MLController allowed running ML, faces indexing starting",
); );
@ -150,13 +152,11 @@ class FaceMlService {
} else { } else {
_logger _logger
.info("MLController stopped running ML, faces indexing paused"); .info("MLController stopped running ML, faces indexing paused");
pauseIndexing(); pauseIndexingAndClustering();
} }
}); });
} else { } else {
if (!kDebugMode) { unawaited(indexAndClusterAll());
unawaited(indexAndClusterAll());
}
} }
}); });
} }
@ -167,22 +167,13 @@ class FaceMlService {
void listenIndexOnDiffSync() { void listenIndexOnDiffSync() {
Bus.instance.on<DiffSyncCompleteEvent>().listen((event) async { Bus.instance.on<DiffSyncCompleteEvent>().listen((event) async {
if (LocalSettings.instance.isFaceIndexingEnabled == false || kDebugMode) { unawaited(sync());
return;
}
// [neeraj] intentional delay in starting indexing on diff sync, this gives time for the user
// to disable face-indexing in case it's causing crash. In the future, we
// should have a better way to handle this.
shouldSyncPeople = true;
Future.delayed(const Duration(seconds: 10), () {
unawaited(indexAndClusterAll());
});
}); });
} }
void listenOnPeopleChangedSync() { void listenOnPeopleChangedSync() {
Bus.instance.on<PeopleChangedEvent>().listen((event) { Bus.instance.on<PeopleChangedEvent>().listen((event) {
shouldSyncPeople = true; _shouldSyncPeople = true;
}); });
} }
@ -218,9 +209,9 @@ class FaceMlService {
}); });
} }
Future<void> initIsolate() async { Future<void> _initIsolate() async {
return _initLockIsolate.synchronized(() async { return _initLockIsolate.synchronized(() async {
if (isIsolateSpawned) return; if (_isIsolateSpawned) return;
_logger.info("initIsolate called"); _logger.info("initIsolate called");
_receivePort = ReceivePort(); _receivePort = ReceivePort();
@ -231,19 +222,19 @@ class FaceMlService {
_receivePort.sendPort, _receivePort.sendPort,
); );
_mainSendPort = await _receivePort.first as SendPort; _mainSendPort = await _receivePort.first as SendPort;
isIsolateSpawned = true; _isIsolateSpawned = true;
_resetInactivityTimer(); _resetInactivityTimer();
} catch (e) { } catch (e) {
_logger.severe('Could not spawn isolate', e); _logger.severe('Could not spawn isolate', e);
isIsolateSpawned = false; _isIsolateSpawned = false;
} }
}); });
} }
Future<void> ensureSpawnedIsolate() async { Future<void> _ensureSpawnedIsolate() async {
if (!isIsolateSpawned) { if (!_isIsolateSpawned) {
await initIsolate(); await _initIsolate();
} }
} }
@ -286,11 +277,11 @@ class FaceMlService {
Future<dynamic> _runInIsolate( Future<dynamic> _runInIsolate(
(FaceMlOperation, Map<String, dynamic>) message, (FaceMlOperation, Map<String, dynamic>) message,
) async { ) async {
await ensureSpawnedIsolate(); await _ensureSpawnedIsolate();
return _functionLock.synchronized(() async { return _functionLock.synchronized(() async {
_resetInactivityTimer(); _resetInactivityTimer();
if (isImageIndexRunning == false || canRunMLController == false) { if (_shouldPauseIndexingAndClustering == false) {
return null; return null;
} }
@ -338,24 +329,31 @@ class FaceMlService {
} }
void disposeIsolate() async { void disposeIsolate() async {
if (!isIsolateSpawned) return; if (!_isIsolateSpawned) return;
await release(); await release();
isIsolateSpawned = false; _isIsolateSpawned = false;
_isolate.kill(); _isolate.kill();
_receivePort.close(); _receivePort.close();
_inactivityTimer?.cancel(); _inactivityTimer?.cancel();
} }
Future<void> indexAndClusterAll() async { Future<void> sync({bool forceSync = true}) async {
if (isClusteringRunning || isImageIndexRunning) { if (_isSyncing) {
_logger.info("indexing or clustering is already running, skipping");
return; return;
} }
if (shouldSyncPeople) { _isSyncing = true;
if (forceSync) {
await PersonService.instance.reconcileClusters(); await PersonService.instance.reconcileClusters();
shouldSyncPeople = false; _shouldSyncPeople = false;
} }
_isSyncing = false;
}
Future<void> indexAndClusterAll() async {
if (_cannotRunMLFunction()) return;
await sync(forceSync: _shouldSyncPeople);
await indexAllImages(); await indexAllImages();
final indexingCompleteRatio = await _getIndexedDoneRatio(); final indexingCompleteRatio = await _getIndexedDoneRatio();
if (indexingCompleteRatio < 0.95) { if (indexingCompleteRatio < 0.95) {
@ -368,35 +366,20 @@ class FaceMlService {
} }
} }
void pauseIndexingAndClustering() {
if (_isIndexingOrClusteringRunning) {
_shouldPauseIndexingAndClustering = true;
}
}
Future<void> clusterAllImages({ Future<void> clusterAllImages({
double minFaceScore = kMinimumQualityFaceScore, double minFaceScore = kMinimumQualityFaceScore,
bool clusterInBuckets = true, bool clusterInBuckets = true,
}) async { }) async {
if (!canRunMLController) { if (_cannotRunMLFunction()) return;
_logger
.info("MLController does not allow running ML, skipping clustering");
return;
}
if (isClusteringRunning) {
_logger.info("clusterAllImages is already running, skipping");
return;
}
// verify faces is enabled
if (LocalSettings.instance.isFaceIndexingEnabled == false) {
_logger.warning("clustering is disabled by user");
return;
}
final indexingCompleteRatio = await _getIndexedDoneRatio();
if (indexingCompleteRatio < 0.95) {
_logger.info(
"Indexing is not far enough, skipping clustering. Indexing is at $indexingCompleteRatio",
);
return;
}
_logger.info("`clusterAllImages()` called"); _logger.info("`clusterAllImages()` called");
isClusteringRunning = true; _isIndexingOrClusteringRunning = true;
final clusterAllImagesTime = DateTime.now(); final clusterAllImagesTime = DateTime.now();
try { try {
@ -441,7 +424,7 @@ class FaceMlService {
int bucket = 1; int bucket = 1;
while (true) { while (true) {
if (!canRunMLController) { if (_shouldPauseIndexingAndClustering) {
_logger.info( _logger.info(
"MLController does not allow running ML, stopping before clustering bucket $bucket", "MLController does not allow running ML, stopping before clustering bucket $bucket",
); );
@ -535,7 +518,8 @@ class FaceMlService {
} catch (e, s) { } catch (e, s) {
_logger.severe("`clusterAllImages` failed", e, s); _logger.severe("`clusterAllImages` failed", e, s);
} finally { } finally {
isClusteringRunning = false; _isIndexingOrClusteringRunning = false;
_shouldPauseIndexingAndClustering = false;
} }
} }
@ -543,17 +527,10 @@ class FaceMlService {
/// ///
/// This function first checks if the image has already been analyzed with the lastest faceMlVersion and stored in the database. If so, it skips the image. /// This function first checks if the image has already been analyzed with the lastest faceMlVersion and stored in the database. If so, it skips the image.
Future<void> indexAllImages({int retryFetchCount = 10}) async { Future<void> indexAllImages({int retryFetchCount = 10}) async {
if (isImageIndexRunning) { if (_cannotRunMLFunction()) return;
_logger.warning("indexAllImages is already running, skipping");
return;
}
// verify faces is enabled
if (LocalSettings.instance.isFaceIndexingEnabled == false) {
_logger.warning("indexing is disabled by user");
return;
}
try { try {
isImageIndexRunning = true; _isIndexingOrClusteringRunning = true;
_logger.info('starting image indexing'); _logger.info('starting image indexing');
final w = (kDebugMode ? EnteWatch('prepare indexing files') : null) final w = (kDebugMode ? EnteWatch('prepare indexing files') : null)
@ -629,7 +606,7 @@ class FaceMlService {
final List<Face> faces = []; final List<Face> faces = [];
final remoteFileIdToVersion = <int, int>{}; final remoteFileIdToVersion = <int, int>{};
for (FileMl fileMl in res.mlData.values) { for (FileMl fileMl in res.mlData.values) {
if (shouldDiscardRemoteEmbedding(fileMl)) continue; if (_shouldDiscardRemoteEmbedding(fileMl)) continue;
if (fileMl.faceEmbedding.faces.isEmpty) { if (fileMl.faceEmbedding.faces.isEmpty) {
faces.add( faces.add(
Face.empty( Face.empty(
@ -688,7 +665,7 @@ class FaceMlService {
final smallerChunks = chunk.chunks(_fileDownloadLimit); final smallerChunks = chunk.chunks(_fileDownloadLimit);
for (final smallestChunk in smallerChunks) { for (final smallestChunk in smallerChunks) {
for (final enteFile in smallestChunk) { for (final enteFile in smallestChunk) {
if (isImageIndexRunning == false) { if (_shouldPauseIndexingAndClustering) {
_logger.info("indexAllImages() was paused, stopping"); _logger.info("indexAllImages() was paused, stopping");
break outerLoop; break outerLoop;
} }
@ -712,16 +689,17 @@ class FaceMlService {
stopwatch.stop(); stopwatch.stop();
_logger.info( _logger.info(
"`indexAllImages()` finished. Analyzed $fileAnalyzedCount images, in ${stopwatch.elapsed.inSeconds} seconds (avg of ${stopwatch.elapsed.inSeconds / fileAnalyzedCount} seconds per image, skipped $fileSkippedCount images. MLController status: $canRunMLController)", "`indexAllImages()` finished. Analyzed $fileAnalyzedCount images, in ${stopwatch.elapsed.inSeconds} seconds (avg of ${stopwatch.elapsed.inSeconds / fileAnalyzedCount} seconds per image, skipped $fileSkippedCount images. MLController status: $_mlControllerStatus)",
); );
} catch (e, s) { } catch (e, s) {
_logger.severe("indexAllImages failed", e, s); _logger.severe("indexAllImages failed", e, s);
} finally { } finally {
isImageIndexRunning = false; _isIndexingOrClusteringRunning = false;
_shouldPauseIndexingAndClustering = false;
} }
} }
bool shouldDiscardRemoteEmbedding(FileMl fileMl) { bool _shouldDiscardRemoteEmbedding(FileMl fileMl) {
if (fileMl.faceEmbedding.version < faceMlVersion) { if (fileMl.faceEmbedding.version < faceMlVersion) {
debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} " debugPrint("Discarding remote embedding for fileID ${fileMl.fileID} "
"because version is ${fileMl.faceEmbedding.version} and we need $faceMlVersion"); "because version is ${fileMl.faceEmbedding.version} and we need $faceMlVersion");
@ -861,10 +839,6 @@ class FaceMlService {
} }
} }
void pauseIndexing() {
isImageIndexRunning = false;
}
/// Analyzes the given image data by running the full pipeline for faces, using [analyzeImageSync] in the isolate. /// Analyzes the given image data by running the full pipeline for faces, using [analyzeImageSync] in the isolate.
Future<FaceMlResult?> analyzeImageInSingleIsolate(EnteFile enteFile) async { Future<FaceMlResult?> analyzeImageInSingleIsolate(EnteFile enteFile) async {
_checkEnteFileForID(enteFile); _checkEnteFileForID(enteFile);
@ -1334,8 +1308,8 @@ class FaceMlService {
_logger.warning( _logger.warning(
'''Skipped analysis of image with enteFile, it might be the wrong format or has no uploadedFileID, or MLController doesn't allow it to run. '''Skipped analysis of image with enteFile, it might be the wrong format or has no uploadedFileID, or MLController doesn't allow it to run.
enteFile: ${enteFile.toString()} enteFile: ${enteFile.toString()}
isImageIndexRunning: $isImageIndexRunning isImageIndexRunning: $_isIndexingOrClusteringRunning
canRunML: $canRunMLController canRunML: $_mlControllerStatus
''', ''',
); );
throw CouldNotRetrieveAnyFileData(); throw CouldNotRetrieveAnyFileData();
@ -1361,7 +1335,8 @@ class FaceMlService {
} }
bool _skipAnalysisEnteFile(EnteFile enteFile, Map<int, int> indexedFileIds) { bool _skipAnalysisEnteFile(EnteFile enteFile, Map<int, int> indexedFileIds) {
if (isImageIndexRunning == false || canRunMLController == false) { if (_isIndexingOrClusteringRunning == false ||
_mlControllerStatus == false) {
return true; return true;
} }
// Skip if the file is not uploaded or not owned by the user // Skip if the file is not uploaded or not owned by the user
@ -1378,4 +1353,49 @@ class FaceMlService {
return indexedFileIds.containsKey(id) && return indexedFileIds.containsKey(id) &&
indexedFileIds[id]! >= faceMlVersion; indexedFileIds[id]! >= faceMlVersion;
} }
bool _cannotRunMLFunction({String function = ""}) {
if (_isIndexingOrClusteringRunning) {
_logger.info(
"Cannot run $function because indexing or clustering is already running",
);
_logStatus();
return true;
}
if (_mlControllerStatus == false) {
_logger.info(
"Cannot run $function because MLController does not allow it",
);
_logStatus();
return true;
}
if (debugIndexingDisabled) {
_logger.info(
"Cannot run $function because debugIndexingDisabled is true",
);
_logStatus();
return true;
}
if (_shouldPauseIndexingAndClustering) {
// This should ideally not be triggered, because one of the above should be triggered instead.
_logger.warning(
"Cannot run $function because indexing and clustering is being paused",
);
_logStatus();
return true;
}
return false;
}
void _logStatus() {
final String status = '''
isInternalUser: ${flagService.internalUser}
isFaceIndexingEnabled: ${LocalSettings.instance.isFaceIndexingEnabled}
canRunMLController: $_mlControllerStatus
isIndexingOrClusteringRunning: $_isIndexingOrClusteringRunning
debugIndexingDisabled: $debugIndexingDisabled
shouldSyncPeople: $_shouldSyncPeople
''';
_logger.info(status);
}
} }

View File

@ -79,7 +79,7 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
final isEnabled = final isEnabled =
await LocalSettings.instance.toggleFaceIndexing(); await LocalSettings.instance.toggleFaceIndexing();
if (!isEnabled) { if (!isEnabled) {
FaceMlService.instance.pauseIndexing(); FaceMlService.instance.pauseIndexingAndClustering();
} }
if (mounted) { if (mounted) {
setState(() {}); setState(() {});
@ -107,7 +107,7 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
setState(() {}); setState(() {});
} }
} catch (e, s) { } catch (e, s) {
_logger.warning('indexing failed ', e, s); _logger.warning('Remote fetch toggle failed ', e, s);
await showGenericErrorDialog(context: context, error: e); await showGenericErrorDialog(context: context, error: e);
} }
}, },
@ -115,22 +115,25 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
sectionOptionSpacing, sectionOptionSpacing,
MenuItemWidget( MenuItemWidget(
captionedTextWidget: CaptionedTextWidget( captionedTextWidget: CaptionedTextWidget(
title: FaceMlService.instance.canRunMLController title: FaceMlService.instance.debugIndexingDisabled
? "canRunML enabled" ? "Debug enable indexing again"
: "canRunML disabled", : "Debug disable indexing",
), ),
pressedColor: getEnteColorScheme(context).fillFaint, pressedColor: getEnteColorScheme(context).fillFaint,
trailingIcon: Icons.chevron_right_outlined, trailingIcon: Icons.chevron_right_outlined,
trailingIconIsMuted: true, trailingIconIsMuted: true,
onTap: () async { onTap: () async {
try { try {
FaceMlService.instance.canRunMLController = FaceMlService.instance.debugIndexingDisabled =
!FaceMlService.instance.canRunMLController; !FaceMlService.instance.debugIndexingDisabled;
if (FaceMlService.instance.debugIndexingDisabled) {
FaceMlService.instance.pauseIndexingAndClustering();
}
if (mounted) { if (mounted) {
setState(() {}); setState(() {});
} }
} catch (e, s) { } catch (e, s) {
_logger.warning('canRunML toggle failed ', e, s); _logger.warning('debugIndexingDisabled toggle failed ', e, s);
await showGenericErrorDialog(context: context, error: e); await showGenericErrorDialog(context: context, error: e);
} }
}, },
@ -145,6 +148,7 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
trailingIconIsMuted: true, trailingIconIsMuted: true,
onTap: () async { onTap: () async {
try { try {
FaceMlService.instance.debugIndexingDisabled = false;
unawaited(FaceMlService.instance.indexAndClusterAll()); unawaited(FaceMlService.instance.indexAndClusterAll());
} catch (e, s) { } catch (e, s) {
_logger.warning('indexAndClusterAll failed ', e, s); _logger.warning('indexAndClusterAll failed ', e, s);
@ -162,6 +166,7 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
trailingIconIsMuted: true, trailingIconIsMuted: true,
onTap: () async { onTap: () async {
try { try {
FaceMlService.instance.debugIndexingDisabled = false;
unawaited(FaceMlService.instance.indexAllImages()); unawaited(FaceMlService.instance.indexAllImages());
} catch (e, s) { } catch (e, s) {
_logger.warning('indexing failed ', e, s); _logger.warning('indexing failed ', e, s);
@ -189,6 +194,7 @@ class _FaceDebugSectionWidgetState extends State<FaceDebugSectionWidget> {
onTap: () async { onTap: () async {
try { try {
await PersonService.instance.storeRemoteFeedback(); await PersonService.instance.storeRemoteFeedback();
FaceMlService.instance.debugIndexingDisabled = false;
await FaceMlService.instance await FaceMlService.instance
.clusterAllImages(clusterInBuckets: true); .clusterAllImages(clusterInBuckets: true);
Bus.instance.fire(PeopleChangedEvent()); Bus.instance.fire(PeopleChangedEvent());

View File

@ -208,7 +208,7 @@ class _MachineLearningSettingsPageState
if (isEnabled) { if (isEnabled) {
unawaited(FaceMlService.instance.ensureInitialized()); unawaited(FaceMlService.instance.ensureInitialized());
} else { } else {
FaceMlService.instance.pauseIndexing(); FaceMlService.instance.pauseIndexingAndClustering();
} }
if (mounted) { if (mounted) {
setState(() {}); setState(() {});