2017-08-15 88 views
0

正如弗林克文档中提到,我可以打开使用发送数据流/遥控插座弗林克程序在主机操作系统上运行

[email protected]:~$ nc -l 12345 

本地套接字,然后接收读取来自文本服务器文本输入它使用

DataStream<String> text = env.socketTextStream("localhost", 12345); 

text.print(); 

env.execute(); 

然而,当我模拟了一些场景,所以我想从一个虚拟机(然后各个虚拟机的最终)获取数据流,并将其发送到主机操作系统上运行的CEP程序弗林克程序。

所以,我一直在使用安装了虚拟机,利用流浪和SSH到它vagrant ssh

  1. 来宾操作系统的主机名是precise64

  2. IP地址用ifconfig = 10.0 .2.15

现在,我现在想要做的就是看我是否可以从虚拟机发送一些数据并以Flink程序接收数据,就像我在本地环境中执行的一样。

我打开Netcat的插座上来宾操作系统使用

[email protected]:~$ nc -l 12345 

,我试图通过接受它的宿主程序,但遇到错误

DataStream<String> text = env.socketTextStream("precise64", 12345); 

text.print(); 

env.execute(); 

我也试过[email protected]以上,但我认为我做错了。

任何想法,我应该怎样的方法来从虚拟机发送数据流中主机弗林克计划

建议者居多,在此先感谢!

回答

1

你可以试试这个:

1.Program:

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.api.windowing.time.Time 

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 

2.After跑到上面program.You可以开始这一点。

nc -lk 9999 

这会奏效。