Read/Write & connection problems with I/O streams and URLSessionStreamTask

Context

I am trying to create a communication channel based on Bonjour NetService and URLSessionStreamTask to open the I/O stream.

I have the setup ready and I am able to connect with bonjour service and open I/O streams between mac and iOS app.

I am using swift codables for request and response, so communication is based on codable structs encoded as Data. I send a Request struct (by outputStream.write(data)) and wait for a response struct from mac app in input stream.

Problem

I am sending fetchUser request (after receiving response for loginUser), the mac app receives the request and writes the response without write failing (-1) but I'm not receiving the response in iOS app. (Although I received the response for previous loginUser request)

I'm not able to figure out where exactly the issue is. Attaching the code snippets here.

PS: I am noob at IO streams handling so a detailed response will be helpful. May be some sources where I can understand more about the topic.


let service: NetService = NetService(domain: "local.", type: "_my-app._tcp.", name: "test", port: 0)
service.delegate = self
service.publish(options: .listenForConnections)
//Writes response to connected output stream
func send(response: HalloResponse) {
do {
let data = try encoder.encode(response)
print("HalloServer: Response: \(String(describing: String(data: data, encoding: .utf8)))")
if serviceDelegate.dataStream?.outputSteam.write(data: data) == -1 {
print("HalloServer: send(response: HalloResponse) Write failied")
}
} catch {
print("HalloServer: Exception in send(request: Request)")
}
}
//NetServiceDelegate
func netService(_ sender: NetService, didAcceptConnectionWith inputStream: InputStream, outputStream: OutputStream) {
print("NetServiceDelegate: service - \(sender.name) inputStream - \(inputStream) outputStream \(outputStream)")
self.inputStream = inputStream
self.outputSteam = outputSteam
self.inputStream.delegate = self
self.outputSteam.delegate = self
self.inputStream.schedule(in: .main, forMode: .default)
self.inputStream.schedule(in: .main, forMode: .default)
self.inputStream.open()
self.inputStream.open()
}
// StreamDelegate
func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
print("StreamDelegate: handle eventCode: \(eventCode.rawValue)")
if inputStream == aStream {
switch eventCode {
case .hasBytesAvailable:
var data = Data()
guard inputStream.read(data: &data) > 0 else { return }
print("HalloDataStream: Recieved data - \(String(describing: String(data: data, encoding: .utf8)))")
let decoder = JSONDecoder()
if let request = try? decoder.decode(Request.self, from: data) {
delegate?.didReceive(request: request)
}
if let response = try? decoder.decode(HalloResponse.self, from: data) {
delegate?.didReceive(response: response)
}
default: break
}
}
}


iOS App code


serviceBrowser.searchForServices(ofType: "_my-app._tcp.", inDomain: "local.")
func connect(with service: NetService, completion: @escaping DeviceConnectionCompletion) {
deviceCompletion = completion
let config = URLSessionConfiguration.default
config.requestCachePolicy = .reloadIgnoringLocalCacheData
let session = URLSession(configuration: config, delegate: self, delegateQueue: .main)
streamTask = session.streamTask(with: service)
streamTask?.resume()
streamTask?.captureStreams()
}
func send(request: Request) {
do {
let data = try encoder.encode(request)
print("HalloClient: Request: \(String(describing: String(data: data, encoding: .utf8)))")
if dataStream?.outputSteam.write(data: data) == -1 {
print("HalloClient: send(request: Request) Write failied")
}
} catch {
print("HalloClient: Exception in send(request: Request)")
}
}
// URLSessionStreamDelegate
func urlSession(
_ session: URLSession,
streamTask: URLSessionStreamTask,
didBecome inputStream: InputStream,
outputStream: OutputStream
) {
print("didBecomeInputStream:(NSInputStream *)inputStream outputStream: OutputStream")
deviceCompletion?(true)
self.inputStream = inputStream
self.outputSteam = outputSteam
self.inputStream.delegate = self
self.outputSteam.delegate = self
self.inputStream.schedule(in: .main, forMode: .default)
self.inputStream.schedule(in: .main, forMode: .default)
self.inputStream.open()
self.inputStream.open()
}
// StreamDelegate
func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
// code exactly same as mac app delegate
}


Extensions on IO stream

extension InputStream {
private var maxLength: Int { return 4096 }
func read(data: inout Data) -> Int {
var totalReadCount: Int = 0
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: maxLength)
while hasBytesAvailable {
let numberOfBytesRead = read(buffer, maxLength: maxLength)
if numberOfBytesRead < 0, let error = streamError {
print("Read Error: \(error)")
break
}
data.append(buffer, count: numberOfBytesRead)
totalReadCount += numberOfBytesRead
}
return totalReadCount
}
}
extension OutputStream {
@discardableResult
func write(data: Data) -> Int {
if streamStatus != .open {
open()
}
let count = data.count
let result = data.withUnsafeBytes {
write($0.bindMemory(to: UInt8.self).baseAddress!, maxLength: count)
}
close()
return result
}
}


It would be really helpful if somebody can review my code and help me figure out the issue. I have a feeling that the issue is with stream open() and close(). Initially, nothing was working but adding open and close functions during write helped. Maybe I need a better way to fix this problem. PS: I had the same problem with CocoaAsyncSocket and I am not looking to use it or any other third party solution.


Thank you


Thomas

Answers 0

Related Articles