diff --git a/lib/sftp_worker.dart b/lib/sftp_worker.dart index 0221c94..c6aaaae 100644 --- a/lib/sftp_worker.dart +++ b/lib/sftp_worker.dart @@ -109,6 +109,66 @@ class SftpWorker { static void _sftpCmdHandler(SendPort sendPort, ReceivePort receivePort, SftpClient sftpClient) { + + final StreamController<(int, DownloadFile)> downloadController = StreamController(); + downloadController.stream.asyncMap((cmd) async { + final (int id, DownloadFile downloadCmd) = cmd; + try { + final localFile = File('${downloadCmd.downloadPath}/${downloadCmd.file.filename}'); + if (await localFile.exists()) { + sendPort.send((id, RemoteError('File Already Exists', ''))); + return; + } + final localFileWriter = await localFile.open(mode: FileMode.write); + final remoteFile = await sftpClient.open('${downloadCmd.path}${downloadCmd.file.filename}'); + final fileSize = downloadCmd.file.attr.size!; + bool timeout = true; + await for (final bytes in remoteFile.read( + onProgress: (progress) { + if (timeout) { + timeout = false; + sendPort.send((id, progress/fileSize)); + Future.delayed(Duration(seconds: 2), () => timeout = true); + } + } + )) { + await localFileWriter.writeFrom(bytes); + } + } + on SftpStatusError catch (e) { + sendPort.send((id, RemoteError(e.message, ''))); + } + sendPort.send((id, 1.0)); + }).listen((_) {}); + + final StreamController<(int, UploadFile)> uploadController = StreamController(); + uploadController.stream.asyncMap((cmd) async { + final (int id, UploadFile uploadCmd) = cmd; + try { + final file = File(uploadCmd.filePath); + final fileSize = await file.length(); + final remoteFile = await sftpClient.open( + '${uploadCmd.path}${basename(uploadCmd.filePath)}', + mode: SftpFileOpenMode.create | SftpFileOpenMode.write | SftpFileOpenMode.exclusive + ); + bool timeout = true; + await remoteFile.write( + file.openRead().cast(), + onProgress: (progress) { + if (timeout) { + timeout = false; + sendPort.send((id, progress/fileSize)); + Future.delayed(Duration(seconds: 2), () => timeout = true); + } + } + ); + } + on SftpStatusError catch (e) { + sendPort.send((id, RemoteError(e.message, ''))); + } + sendPort.send((id, 1.0)); + }).listen((_) {}); + receivePort.listen((message) async { final (int id, SftpCommand command) = message; switch (command) { @@ -120,30 +180,8 @@ class SftpWorker { on SftpStatusError catch (e) { sendPort.send((id, RemoteError(e.message, ''))); } - case UploadFile(:final path, :final filePath): - try { - final file = File(filePath); - final fileSize = await file.length(); - final remoteFile = await sftpClient.open( - '$path${basename(filePath)}', - mode: SftpFileOpenMode.create | SftpFileOpenMode.write | SftpFileOpenMode.exclusive - ); - bool timeout = true; - await remoteFile.write( - file.openRead().cast(), - onProgress: (progress) { - if (timeout) { - timeout = false; - sendPort.send((id, progress/fileSize)); - Future.delayed(Duration(seconds: 2), () => timeout = true); - } - } - ); - } - on SftpStatusError catch (e) { - sendPort.send((id, RemoteError(e.message, ''))); - } - sendPort.send((id, 1.0)); + case UploadFile(): + uploadController.add((id, command)); case MkDir(:final path): try { await sftpClient.mkdir(path); @@ -187,33 +225,8 @@ class SftpWorker { on SftpStatusError catch (e) { sendPort.send((id, RemoteError(e.message, ''))); } - case DownloadFile(:final file, :final path, :final downloadPath): - try { - final localFile = File('$downloadPath/${file.filename}'); - if (await localFile.exists()) { - sendPort.send((id, RemoteError('File Already Exists', ''))); - break; - } - final localFileWriter = await localFile.open(mode: FileMode.write); - final remoteFile = await sftpClient.open('$path${file.filename}'); - final fileSize = file.attr.size!; - bool timeout = true; - await for (final bytes in remoteFile.read( - onProgress: (progress) { - if (timeout) { - timeout = false; - sendPort.send((id, progress/fileSize)); - Future.delayed(Duration(seconds: 2), () => timeout = true); - } - } - )) { - await localFileWriter.writeFrom(bytes); - } - } - on SftpStatusError catch (e) { - sendPort.send((id, RemoteError(e.message, ''))); - } - sendPort.send((id, 1.0)); + case DownloadFile(): + downloadController.add((id, command)); } }); }