A fairly big rework of the code to split the network handling code from the speech handling code. Also, introduce a thread for reading from the socket and a separate thread for writing to the socket. That way, disconnections made by the NDA are handled correctly.

This commit is contained in:
Jeremy Rand 2022-03-15 23:58:04 -04:00
parent de1cab8207
commit 923d0bf967
6 changed files with 428 additions and 265 deletions

View File

@ -7,6 +7,7 @@
objects = {
/* Begin PBXBuildFile section */
9D05BAAA27DFDE6300D9CC4B /* GSConnection.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9D05BAA927DFDE6300D9CC4B /* GSConnection.swift */; };
9D5155F326A1EF7B0075EBC7 /* ListenerGSApp.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9D5155F226A1EF7B0075EBC7 /* ListenerGSApp.swift */; };
9D5155F726A1EF7C0075EBC7 /* Assets.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 9D5155F626A1EF7C0075EBC7 /* Assets.xcassets */; };
9D5155FA26A1EF7C0075EBC7 /* Preview Assets.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 9D5155F926A1EF7C0075EBC7 /* Preview Assets.xcassets */; };
@ -48,6 +49,7 @@
/* End PBXContainerItemProxy section */
/* Begin PBXFileReference section */
9D05BAA927DFDE6300D9CC4B /* GSConnection.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = GSConnection.swift; sourceTree = "<group>"; };
9D0DC15826F2E47A007EB92D /* ListenerGS.entitlements */ = {isa = PBXFileReference; lastKnownFileType = text.plist.entitlements; path = ListenerGS.entitlements; sourceTree = "<group>"; };
9D5155EF26A1EF7B0075EBC7 /* ListenerGS.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = ListenerGS.app; sourceTree = BUILT_PRODUCTS_DIR; };
9D5155F226A1EF7B0075EBC7 /* ListenerGSApp.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ListenerGSApp.swift; sourceTree = "<group>"; };
@ -141,6 +143,7 @@
9DCCDACB271FB87100F311DF /* GSDestinations.swift */,
9DD8905F2772D3B20084A894 /* GSView.swift */,
9D6ED239271E6BD600D773CD /* SpeechForwarder.swift */,
9D05BAA927DFDE6300D9CC4B /* GSConnection.swift */,
9DD8905E27726C140084A894 /* ListenerGS Icon.pxm */,
9D51566326A36F530075EBC7 /* BinUtils */,
9D51563626A36AD60075EBC7 /* SwiftSocket */,
@ -365,6 +368,7 @@
buildActionMask = 2147483647;
files = (
9D51565726A36B410075EBC7 /* TCPClient.swift in Sources */,
9D05BAAA27DFDE6300D9CC4B /* GSConnection.swift in Sources */,
9D6F27092728EF410089585E /* MainView.swift in Sources */,
9D51565526A36B410075EBC7 /* UDPClient.swift in Sources */,
9DD67CF02728F5B700243FC6 /* DestinationsView.swift in Sources */,

View File

@ -0,0 +1,346 @@
//
// GSConnection.swift
// ListenerGS
//
// Created by Jeremy Rand on 2022-03-14.
//
import Foundation
import os
struct GSConnectionErrorMessage: Identifiable {
var id: String { message }
let title: String
let message: String
}
enum GSConnectionState {
case disconnected
case connecting
case connected
case listening
case stoplistening
}
extension GSConnectionState: CustomStringConvertible
{
var description: String {
switch self {
case .disconnected:
return "disconnected"
case .connecting:
return "connecting"
case .connected:
return "connected"
case .listening:
return "listening"
case .stoplistening:
return "stop listening"
}
}
}
protocol SpeechForwarderProtocol
{
func startListening() -> Bool
func stopListening()
}
class GSConnection : ObservableObject {
@Published var state = GSConnectionState.disconnected
@Published var textHeard = ""
@Published var errorMessage : GSConnectionErrorMessage?
var speechForwarder : SpeechForwarderProtocol?
let LISTEN_STATE_MSG = 1
let LISTEN_TEXT_MSG = 2
let LISTEN_SEND_MORE = 3
let port = 19026
private var destination = ""
private var client: TCPClient?
private let logger = Logger()
private let readQueue = OperationQueue()
private let writeQueue = OperationQueue()
private var condition = NSCondition()
private var stopListeningFlag = false
private var canSend = true
func changeState(newState : GSConnectionState)
{
if (state == newState) {
return;
}
var legalTransition = false
switch (newState)
{
case .disconnected:
legalTransition = ((state == .connected) || (state == .connecting))
case .connecting:
legalTransition = (state == .disconnected)
case .connected:
legalTransition = ((state == .connecting) || (state == .listening) || (state == .stoplistening))
case .listening:
legalTransition = (state == .connected)
case .stoplistening:
legalTransition = ((state == .connected) || (state == .listening))
}
if (!legalTransition) {
logger.error("Illegal requested state transition from \(self.state) to \(newState)")
errorOccurred(title: "Bad State Change", message: "Illegal state transition from \(self.state) to \(newState)")
} else {
state = newState
}
}
func errorOccurred(title: String, message : String)
{
OperationQueue.main.addOperation {
self.errorMessage = GSConnectionErrorMessage(title: title, message: message)
}
}
private func connectionFailed() {
errorOccurred(title: "Connect Error", message: "Failed to connect to \(destination)")
changeState(newState:.disconnected)
}
private func connectionSuccessful()
{
changeState(newState:.connected)
logger.debug("Connected to \(self.destination)")
}
private func doConnect() {
logger.debug("Attempting to connect to \(self.destination)")
client = TCPClient(address: destination, port: Int32(port))
guard let client = client else {
OperationQueue.main.addOperation { self.connectionFailed() }
return
}
switch client.connect(timeout: 10) {
case .success:
OperationQueue.main.addOperation { self.connectionSuccessful() }
case .failure(let error):
client.close()
self.client = nil
logger.error("Failed to connect to \(self.destination): \(String(describing: error))")
OperationQueue.main.addOperation { self.connectionFailed() }
return
}
while (true) {
guard let byteArray = client.read(2) else {
break
}
let data = Data(byteArray)
do {
let unpacked = try unpack("<h", data)
if (unpacked[0] as? Int == LISTEN_SEND_MORE) {
condition.lock()
canSend = true
condition.broadcast()
condition.unlock()
} else {
logger.error("Unexpected message on socket from \(self.destination)")
errorOccurred(title: "Protocol Error", message: "Unexpected message from the GS")
break
}
}
catch {
logger.error("Unable to unpack message on socket from \(self.destination)")
errorOccurred(title: "Protocol Error", message: "Unexpected message from the GS")
break
}
}
client.close()
self.client = nil
OperationQueue.main.addOperation { self.disconnect() }
}
func connect(destination : String) {
self.destination = destination
changeState(newState: .connecting)
readQueue.addOperation {
self.doConnect()
}
}
deinit {
disconnect()
}
func disconnect() {
if (state == .listening) {
stopListening()
}
condition.lock()
if (client != nil) {
client!.close()
self.client = nil
}
condition.broadcast()
condition.unlock()
self.changeState(newState:.disconnected)
}
func stopListening() {
logger.debug("Stopped listening")
if let speechForwarder = speechForwarder {
speechForwarder.stopListening()
self.speechForwarder = nil
}
condition.lock()
if (state == .listening) {
changeState(newState: .stoplistening)
condition.broadcast()
}
condition.unlock()
}
private func sendListenMsg(isListening: Bool) -> Bool {
guard let client = client else { return false }
switch (client.send(data: pack("<hh", [LISTEN_STATE_MSG, isListening ? 1 : 0]))) {
case .success:
break
case .failure(let error):
self.logger.error("Unable to send header: \(String(describing: error))")
return false
}
return true
}
func listen(speechForwarder: SpeechForwarderProtocol) {
textHeard = ""
writeQueue.addOperation {
if (!self.sendListenMsg(isListening: true)) {
self.errorOccurred(title: "Write Error", message: "Unable to send data to the GS")
return
}
OperationQueue.main.addOperation {
self.changeState(newState: .listening)
if (!speechForwarder.startListening()) {
self.logger.error("Unable to start listening")
self.errorOccurred(title: "Speech Error", message: "Unable to start listening for speech")
self.stopListening()
return
}
self.speechForwarder = speechForwarder
}
self.send()
_ = self.sendListenMsg(isListening: false)
OperationQueue.main.addOperation {
if (self.state == .stoplistening) {
self.changeState(newState: .connected)
}
}
}
}
func set(text:String)
{
condition.lock()
textHeard = text
condition.broadcast()
condition.unlock()
}
private func send() {
var stringLastSent = ""
var stringToSend = ""
while true {
condition.lock()
guard client != nil else {
condition.unlock()
return
}
if ((stringLastSent == textHeard) && (state == .stoplistening)) {
condition.unlock()
return
}
if ((!canSend) ||
(stringLastSent == textHeard)) {
condition.wait()
condition.unlock()
continue
}
stringToSend = textHeard
condition.unlock()
if send(latestText: stringToSend, lastSent: stringLastSent) {
stringLastSent = stringToSend
condition.lock()
canSend = false
condition.unlock()
}
}
}
private func send(latestText : String, lastSent: String) -> Bool {
guard let client = client else { return false }
var commonChars = lastSent.count
while (commonChars > 0) {
if (latestText.prefix(commonChars) == lastSent.prefix(commonChars)) {
break
}
commonChars -= 1
}
var stringToSend = ""
if (commonChars < lastSent.count) {
stringToSend = String(repeating: "\u{7f}", count: lastSent.count - commonChars)
}
stringToSend.append(contentsOf: latestText.suffix(latestText.count - commonChars).replacingOccurrences(of: "\n", with: "\r"))
if (stringToSend.count == 0) {
return false
}
// JSR_TODO - Handle strings to send that are longer than 64K (doubt that would happen though)
let nsEnc = CFStringConvertEncodingToNSStringEncoding(CFStringEncoding(CFStringBuiltInEncodings.macRoman.rawValue))
let encoding = String.Encoding(rawValue: nsEnc) // String.Encoding
if let bytes = stringToSend.data(using: encoding) {
switch (client.send(data: pack("<hh", [LISTEN_TEXT_MSG, bytes.count]))) {
case .success:
switch (client.send(data: bytes)) {
case .success:
logger.debug("Sent text \"\(stringToSend)\"")
break
case .failure(let error):
OperationQueue.main.addOperation {
self.errorOccurred(title: "Write Error", message: "Unable to send text to the GS")
self.stopListening()
}
logger.error("Failed to send text: \(String(describing: error))")
return false
}
case .failure(let error):
OperationQueue.main.addOperation {
self.errorOccurred(title: "Write Error", message: "Unable to send text to the GS")
self.stopListening()
}
logger.error("Failed to send text: \(String(describing: error))")
}
}
return true
}
}

View File

@ -38,36 +38,61 @@ private extension GSButtonStyle {
struct GSView: View {
private let ipAddress : String
@StateObject private var speechForwarder = SpeechForwarder()
@StateObject private var connection = GSConnection()
var body: some View {
VStack {
VStack {
Button(speechForwarder.connected ?
"\(Image(systemName: "desktopcomputer.trianglebadge.exclamationmark")) Disconnect from \(ipAddress)" :
"\(Image(systemName: "desktopcomputer.and.arrow.down")) Connect to \(ipAddress)") {
if (speechForwarder.connected) {
speechForwarder.disconnect()
} else {
speechForwarder.connect(destination: ipAddress)
switch (connection.state) {
case .disconnected:
Button("\(Image(systemName: "desktopcomputer.and.arrow.down")) Connect to \(ipAddress)") {
connection.connect(destination: ipAddress)
}
.buttonStyle(GSButtonStyle())
case .connecting:
Button("\(Image(systemName: "desktopcomputer.and.arrow.down")) Connecting to \(ipAddress)") {
}
.disabled(true)
.buttonStyle(GSButtonStyle())
case .connected, .listening, .stoplistening:
Button("\(Image(systemName: "desktopcomputer.trianglebadge.exclamationmark")) Disconnect from \(ipAddress)") {
connection.disconnect()
}
.disabled(connection.state != .connected)
.buttonStyle(GSButtonStyle())
}
.disabled(speechForwarder.connecting)
.buttonStyle(GSButtonStyle())
Button(speechForwarder.listening ?
"\(Image(systemName: "ear.trianglebadge.exclamationmark")) Stop Listening" :
"\(Image(systemName: "ear.and.waveform")) Listen and Send Text") {
speechForwarder.listen()
switch (connection.state)
{
case .disconnected, .stoplistening, .connecting:
Button("\(Image(systemName: "ear.and.waveform")) Listen and Send Text") {
}
.disabled(true)
.buttonStyle(GSButtonStyle())
case .connected:
Button("\(Image(systemName: "ear.and.waveform")) Listen and Send Text") {
connection.listen(speechForwarder: SpeechForwarder(connection: connection))
}
.buttonStyle(GSButtonStyle())
case .listening:
Button("\(Image(systemName: "ear.trianglebadge.exclamationmark")) Stop Listening") {
connection.stopListening()
}
.buttonStyle(GSButtonStyle())
}
.disabled((!speechForwarder.connected) || (!speechForwarder.listening && speechForwarder.sending))
.buttonStyle(GSButtonStyle())
}
.fixedSize(horizontal: true, vertical: false)
.navigationBarTitle(ipAddress)
}
.alert(item: $connection.errorMessage) { errorMessage in
Alert(title:Text(errorMessage.title), message: Text(errorMessage.message))
}
Text(speechForwarder.textHeard)
Text(connection.textHeard)
.truncationMode(.head)
.lineLimit(15)
.padding()

View File

@ -19,7 +19,7 @@
<key>CFBundleShortVersionString</key>
<string>1.0</string>
<key>CFBundleVersion</key>
<string>445</string>
<string>483</string>
<key>LSApplicationCategoryType</key>
<string>public.app-category.utilities</string>
<key>LSRequiresIPhoneOS</key>

View File

@ -9,19 +9,9 @@ import Foundation
import os
import Speech
class SpeechForwarder : ObservableObject {
@Published var listening = false
@Published var connected = false
@Published var connecting = false
@Published var textHeard = ""
@Published var sending = false
class SpeechForwarder : SpeechForwarderProtocol {
let LISTEN_STATE_MSG = 1
let LISTEN_TEXT_MSG = 2
let LISTEN_SEND_MORE = 3
let port = 19026
private var client: TCPClient?
private var connection : GSConnection
private let speechRecognizer = SFSpeechRecognizer(locale: Locale(identifier: Locale.preferredLanguages[0]))!
@ -33,234 +23,47 @@ class SpeechForwarder : ObservableObject {
private let logger = Logger()
private let queue = OperationQueue()
private var condition = NSCondition()
private var latestText = ""
func connect(destination : String) {
connecting = true
queue.addOperation {
self.logger.debug("Attempting to connect to \(destination)")
self.client = TCPClient(address: destination, port: Int32(self.port))
guard let client = self.client else {
OperationQueue.main.addOperation { self.connecting = false }
return
}
switch client.connect(timeout: 10) {
case .success:
OperationQueue.main.addOperation { self.connected = true }
self.logger.debug("Connected to \(destination)")
case .failure(let error):
client.close()
self.client = nil
self.logger.error("Failed to connect to \(destination): \(String(describing: error))")
break
}
OperationQueue.main.addOperation { self.connecting = false }
}
init(connection : GSConnection) {
self.connection = connection
}
func disconnect() {
if (listening) {
listen()
}
guard let client = client else { return }
condition.lock()
client.close()
self.client = nil
condition.broadcast()
condition.unlock()
connected = false
}
func listen() {
self.listening.toggle()
if (self.listening) {
SFSpeechRecognizer.requestAuthorization { authStatus in
// The authorization status results in changes to the
// apps interface, so process the results on the apps
// main queue.
OperationQueue.main.addOperation {
switch authStatus {
case .authorized:
break
case .denied:
self.listening = false
break
case .restricted:
self.listening = false
break
case .notDetermined:
self.listening = false
break
default:
self.listening = false
break
}
}
}
}
guard let client = client else { return }
if (self.listening) {
switch (client.send(data: isListening())) {
case .success:
func startListening() -> Bool {
SFSpeechRecognizer.requestAuthorization { authStatus in
OperationQueue.main.addOperation {
switch authStatus {
case .authorized:
break
case .failure(let error):
self.listening = false
logger.error("Unable to send header: \(String(describing: error))")
case .denied, .restricted, .notDetermined:
self.connection.stopListening()
default:
self.connection.stopListening()
}
}
}
if (self.listening) {
do {
try startRecording()
logger.debug("Started listening")
}
catch {
self.listening = false
}
}
if (!self.listening) {
logger.debug("Stopped listening")
recognitionRequest?.endAudio()
audioEngine.stop()
audioEngine.inputNode.removeTap(onBus: 0)
recognitionTask?.cancel()
self.recognitionRequest = nil
self.recognitionTask = nil
condition.lock()
self.listening = false
condition.broadcast()
condition.unlock()
switch (client.send(data: isListening())) {
case .success:
break
case .failure(let error):
logger.error("Failed to send header: \(String(describing: error))")
}
}
}
private func isListening() -> Data {
return pack("<hh", [LISTEN_STATE_MSG, listening ? 1 : 0])
}
private func send() {
var stringLastSent = ""
var stringToSend = ""
var canSend = true
while true {
while (!canSend) {
logger.debug("Cannot send")
guard let client = client else {
logger.debug("Returning because client gone")
return
}
guard let byteArray = client.read(2, timeout: 1) else {
logger.debug("Did not read data")
continue
}
let data = Data(byteArray)
do {
let unpacked = try unpack("<h", data)
canSend = (unpacked[0] as? Int == LISTEN_SEND_MORE)
logger.debug("Updated canSend")
}
catch {
logger.debug("Unpack failed")
continue
}
}
logger.debug("Can send")
condition.lock()
while (stringLastSent == latestText) {
if (!self.listening) {
condition.unlock()
return
}
condition.wait()
if (!self.listening) {
condition.unlock()
return
}
guard client != nil else {
condition.unlock()
return
}
}
stringToSend = latestText
condition.unlock()
if send(latestText: stringToSend, lastSent: stringLastSent) {
stringLastSent = stringToSend
canSend = false
}
do {
try startRecording()
logger.debug("Started listening")
}
}
private func send(latestText : String, lastSent: String) -> Bool {
guard let client = client else { return false }
var commonChars = lastSent.count
while (commonChars > 0) {
if (latestText.prefix(commonChars) == lastSent.prefix(commonChars)) {
break
}
commonChars -= 1
}
var stringToSend = ""
if (commonChars < lastSent.count) {
stringToSend = String(repeating: "\u{7f}", count: lastSent.count - commonChars)
}
stringToSend.append(contentsOf: latestText.suffix(latestText.count - commonChars).replacingOccurrences(of: "\n", with: "\r"))
if (stringToSend.count == 0) {
catch {
return false
}
// JSR_TODO - Handle strings to send that are longer than 64K (doubt that would happen though)
let nsEnc = CFStringConvertEncodingToNSStringEncoding(CFStringEncoding(CFStringBuiltInEncodings.macRoman.rawValue))
let encoding = String.Encoding(rawValue: nsEnc) // String.Encoding
if let bytes = stringToSend.data(using: encoding) {
switch (client.send(data: pack("<hh", [LISTEN_TEXT_MSG, bytes.count]))) {
case .success:
switch (client.send(data: bytes)) {
case .success:
logger.debug("Sent text \"\(stringToSend)\"")
break
case .failure(let error):
OperationQueue.main.addOperation {
if (self.listening) {
self.listen()
}
}
logger.error("Failed to send text: \(String(describing: error))")
return false
}
case .failure(let error):
OperationQueue.main.addOperation {
if (self.listening) {
self.listen()
}
}
logger.error("Failed to send text: \(String(describing: error))")
}
}
return true
}
func stopListening() {
logger.debug("Stopped listening")
recognitionRequest?.endAudio()
audioEngine.stop()
audioEngine.inputNode.removeTap(onBus: 0)
recognitionTask?.cancel()
recognitionRequest = nil
recognitionTask = nil
}
private func startRecording() throws {
// Cancel the previous task if it's running.
@ -285,15 +88,6 @@ class SpeechForwarder : ObservableObject {
recognitionRequest.shouldReportPartialResults = true
recognitionRequest.requiresOnDeviceRecognition = false
self.textHeard = ""
self.latestText = ""
self.sending = true
queue.addOperation {
self.send()
OperationQueue.main.addOperation { self.sending = false }
}
// Create a recognition task for the speech recognition session.
// Keep a reference to the task so that it can be canceled.
recognitionTask = speechRecognizer.recognitionTask(with: recognitionRequest) { result, error in
@ -301,25 +95,18 @@ class SpeechForwarder : ObservableObject {
if let result = result {
// Update the text view with the results.
self.condition.lock()
self.latestText = result.bestTranscription.formattedString
self.condition.broadcast()
self.condition.unlock()
OperationQueue.main.addOperation { self.textHeard = result.bestTranscription.formattedString }
OperationQueue.main.addOperation { self.connection.set(text: result.bestTranscription.formattedString) }
isFinal = result.isFinal
}
if error != nil {
self.logger.error("Error from recognizer: \(String(describing: error))")
self.connection.errorOccurred(title: "Recognizer Error", message: "Speech recognizer failed with an error")
}
if error != nil || isFinal {
OperationQueue.main.addOperation {
if (self.listening) {
self.listen()
}
self.connection.stopListening()
}
}
}

View File

@ -120,6 +120,7 @@ int ytcpsocket_pull(int socketfd, char *data, int len, int timeout_sec) {
return ret; // select-call failed or timeout occurred (before anything was sent)
}
}
ytcpsocket_set_block(socketfd, (timeout_sec < 0));
// use loop to make sure receive all data
do {
readlen = (int)read(socketfd, data + datalen, len - datalen);