2017-04-01 117 views
2

我想编写一个Scala客户端,该客户端通过与TLS的tcp连接谈论专有协议。如何使用akka在scala中使用TLS打开TCP连接

基本上,我想重写从Node.js的下面的代码在斯卡拉:

var conn_options = { 
     host: endpoint, 
     port: port 
}; 
tlsSocket = tls.connect(conn_options, function() { 
     if (tlsSocket.authorized) { 
     logger.info('Successfully established a connection'); 

     // Now that the connection has been established, let's perform the handshake 
     // Identification frame: 
     // 1 | I | id_size | id 
     var idFrameTypeAndVersion = "1I"; 
     var clientIdString = "foorbar"; 
     var idDataBuffer = new Buffer(idFrameTypeAndVersion.length + 1 + clientIdString.length); 

     idDataBuffer.write(idFrameTypeAndVersion, 0 , 
     idFrameTypeAndVersion.length); 

     idDataBuffer.writeUIntBE(clientIdString.length, 
     idFrameTypeAndVersion.length, 1); 
     idDataBuffer.write(clientIdString, idFrameTypeAndVersion.length + 1, clientIdString.length); 

     // Send the identification frame to Logmet 
     tlsSocket.write(idDataBuffer); 

     } 
     ... 
} 

akka documentation我发现阿卡一个很好的例子,通过纯TCP,但我不知道如何提升该示例使用TLS套接字连接。有一些较早版本的文档显示with ssl/tls示例,但在较新版本中未找到。

我发现了关于Akka中一个TLS对象的文档,但是我没有在它周围找到任何好的示例。

非常感谢提前!

+1

TLS支持的使用示例可以在其单元测试中找到。 https://github.com/akka/akka/blob/master/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala。希望能帮助到你! –

回答

4

了解它与下面的代码工作,并希望分享。

基本上,我开始看着从akka社区获得的TcpTlsEcho.java

我跟着akka-streams的文档。示出和说明阿卡流的使用的另一种很好的例子可以在下列blog post

连接建立被发现和流动的样子:

/** 
    +---------------------------+    +---------------------------+ 
    | Flow      |    | tlsConnectionFlow   | 
    |       |    |       | 
    | +------+  +------+ |    | +------+  +------+ | 
    | | SRC | ~Out~> |  | ~~> O2 -- I1 ~~> |  | ~O1~> |  | | 
    | |  |  | LOGG | |    | | TLS |  | CONN | | 
    | | SINK | <~In~ |  | <~~ I2 -- O2 <~~ |  | <~I2~ |  | | 
    | +------+  +------+ |    | +------+  +------+ | 
    +---------------------------+    +---------------------------+ 
**/ 
// the tcp connection to the server 
val connection = Tcp().outgoingConnection(address, port) 

// ignore the received data for now. There are different actions to implement the Sink. 
val sink = Sink.ignore 

// create a source as an actor reference 
val source = Source.actorRef(1000, OverflowStrategy.fail) 

// join the TLS BidiFlow (see below) with the connection 
val tlsConnectionFlow = tlsStage(TLSRole.client).join(connection) 

// run the source with the TLS conection flow that is joined with a logging step that prints the bytes that are sent and or received from the connection. 
val sourceActor = tlsConnectionFlow.join(logging).to(sink).runWith(source) 

// send a message to the sourceActor that will be send to the Source of the stream 
sourceActor ! ByteString("<message>") 

的TLS连接流是一个BidiFlow。我的第一个简单例子忽略了所有证书,并避免管理信任和密钥存储。示例如何完成可以在上面的.java示例中找到。

def tlsStage(role: TLSRole)(implicit system: ActorSystem) = { 
    val sslConfig = AkkaSSLConfig.get(system) 
    val config = sslConfig.config 

    // create a ssl-context that ignores self-signed certificates 
    implicit val sslContext: SSLContext = { 
     object WideOpenX509TrustManager extends X509TrustManager { 
      override def checkClientTrusted(chain: Array[X509Certificate], authType: String) =() 
      override def checkServerTrusted(chain: Array[X509Certificate], authType: String) =() 
      override def getAcceptedIssuers = Array[X509Certificate]() 
     } 

     val context = SSLContext.getInstance("TLS") 
     context.init(Array[KeyManager](), Array(WideOpenX509TrustManager), null) 
     context 
    } 
    // protocols 
    val defaultParams = sslContext.getDefaultSSLParameters() 
    val defaultProtocols = defaultParams.getProtocols() 
    val protocols = sslConfig.configureProtocols(defaultProtocols, config) 
    defaultParams.setProtocols(protocols) 

    // ciphers 
    val defaultCiphers = defaultParams.getCipherSuites() 
    val cipherSuites = sslConfig.configureCipherSuites(defaultCiphers, config) 
    defaultParams.setCipherSuites(cipherSuites) 

    val firstSession = new TLSProtocol.NegotiateNewSession(None, None, None, None) 
     .withCipherSuites(cipherSuites: _*) 
     .withProtocols(protocols: _*) 
     .withParameters(defaultParams) 

    val clientAuth = getClientAuth(config.sslParametersConfig.clientAuth) 
    clientAuth map { firstSession.withClientAuth(_) } 

    val tls = TLS.apply(sslContext, firstSession, role) 

    val pf: PartialFunction[TLSProtocol.SslTlsInbound, ByteString] = { 
     case TLSProtocol.SessionBytes(_, sb) => ByteString.fromByteBuffer(sb.asByteBuffer) 
    } 

    val tlsSupport = BidiFlow.fromFlows(
     Flow[ByteString].map(TLSProtocol.SendBytes), 
     Flow[TLSProtocol.SslTlsInbound].collect(pf)); 

    tlsSupport.atop(tls); 
    } 

    def getClientAuth(auth: ClientAuth) = { 
    if (auth.equals(ClientAuth.want)) { 
     Some(TLSClientAuth.want) 
    } else if (auth.equals(ClientAuth.need)) { 
     Some(TLSClientAuth.need) 
    } else if (auth.equals(ClientAuth.none)) { 
     Some(TLSClientAuth.none) 
    } else { 
     None 
    } 
    } 

并且为了完成,还有作为BidiFlow实施的记录阶段。

def logging: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { 
    // function that takes a string, prints it with some fixed prefix in front and returns the string again 
    def logger(prefix: String) = (chunk: ByteString) => { 
     println(prefix + chunk.utf8String) 
     chunk 
    } 

    val inputLogger = logger("> ") 
    val outputLogger = logger("< ") 

    // create BidiFlow with a separate logger function for each of both streams 
    BidiFlow.fromFunctions(outputLogger, inputLogger) 
} 

我会进一步尝试改进和更新答案。希望有所帮助。

+0

欢迎使用堆栈溢出!感谢您分享您的答案。这对我来说太具体了,但可能会帮助其他人。 –

1

我真的很喜欢Jeremias Werner的回答,因为它让我有我需要的地方。然而,我想提供下面的代码(受他的回答影响很大),作为一个“一个剪切和粘贴”解决方案,使用尽可能少的代码创建实际的TLS服务器 。

import javax.net.ssl.SSLContext 

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.TLSProtocol.NegotiateNewSession 
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, TLS, Tcp} 
import akka.stream.{ActorMaterializer, OverflowStrategy, TLSProtocol, TLSRole} 
import akka.util.ByteString 

object TlsClient { 

    // Flow needed for TLS as well as mapping the TLS engine's flow to ByteStrings 
    def tlsClientLayer = { 

    // Default SSL context supporting most protocols and ciphers. Embellish this as you need 
    // by constructing your own SSLContext and NegotiateNewSession instances. 
    val tls = TLS(SSLContext.getDefault, NegotiateNewSession.withDefaults, TLSRole.client) 

    // Maps the TLS stream to a ByteString 
    val tlsSupport = BidiFlow.fromFlows(
     Flow[ByteString].map(TLSProtocol.SendBytes), 
     Flow[TLSProtocol.SslTlsInbound].collect { 
     case TLSProtocol.SessionBytes(_, sb) => ByteString.fromByteBuffer(sb.asByteBuffer) 
     }) 

    tlsSupport.atop(tls) 
    } 

    // Very simple logger 
    def logging: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { 

    // function that takes a string, prints it with some fixed prefix in front and returns the string again 
    def logger(prefix: String) = (chunk: ByteString) => { 
     println(prefix + chunk.utf8String) 
     chunk 
    } 

    val inputLogger = logger("> ") 
    val outputLogger = logger("< ") 

    // create BidiFlow with a separate logger function for each of both streams 
    BidiFlow.fromFunctions(outputLogger, inputLogger) 
    } 

    def main(args: Array[String]): Unit = { 
    implicit val system: ActorSystem = ActorSystem("sip-client") 
    implicit val materializer: ActorMaterializer = ActorMaterializer() 

    val source = Source.actorRef(1000, OverflowStrategy.fail) 
    val connection = Tcp().outgoingConnection("www.google.com", 443) 
    val tlsFlow = tlsClientLayer.join(connection) 
    val srcActor = tlsFlow.join(logging).to(Sink.ignore).runWith(source) 

    // I show HTTP here but send/receive your protocol over this actor 
    // Should respond with a 302 (Found) and a small explanatory HTML message 
    srcActor ! ByteString("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n") 
    } 
}