Accepting Incoming Connections
Until now, we have been initiating all our connections.
To do so, we simply tell the IO subsystem to bind to a local port.
IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 8333))
That takes care of accepting connections. However there is something that we arguably should have done before. Writing
to the TCP stack has to be done one Tcp.Write
at a time. It was not a problem when we were not relaying blocks
because we were only sending GetHeaders
and GetData
, but with the transaction relay we are sending more stuff
in parallel.
Akka will reject a Tcp.Write if there is another one outstanding. There are several ways to handle the backpressure,
we are just going to do the simplest form. We ask for an Ack
message and buffer writes until we get it. When
we receive the Ack, we emit any outstanding message. As a matter of fact, collisions aren't very common because
we already buffer Inv in the peer actor.
var acknowledged = true
var writeBuffer = Queue.empty[ByteString]
private def sendMessage(bm: BitcoinMessage) = {
bm match {
case block: Block => log.info(s"block ${hashToString(block.header.hash)}")
case _ =>
}
val bytes = bm.toMessage()
writeBuffer :+= bytes
if (acknowledged) {
connection ! Tcp.Write(bytes, Ack)
acknowledged = false
}
}
private def acknowledge() = {
writeBuffer = writeBuffer.drop(1)
if (writeBuffer.isEmpty)
acknowledged = true
else
connection ! Tcp.Write(writeBuffer(0), Ack)
}