1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| public class ConnectOperator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setParallelism(1);
Properties p = new Properties(); p.setProperty("bootstrap.servers", "localhost:9092");
SingleOutputStreamOperator<Student> student = sEnv .addSource(new FlinkKafkaConsumer010<String>("student", new SimpleStringSchema(), p)) .map(new MapFunction<String, Student>() { @Override public Student map(String value) throws Exception { return new Gson().fromJson(value, Student.class); } });
student.print();
SingleOutputStreamOperator<Teacher> teacher = sEnv .addSource(new FlinkKafkaConsumer010<String>("teacher", new SimpleStringSchema(), p)) .map(new MapFunction<String, Teacher>() { @Override public Teacher map(String value) throws Exception { return new Gson().fromJson(value, Teacher.class); } });
teacher.print();
ConnectedStreams<Student, Teacher> connect = student.connect(teacher);
connect.process(new CoProcessFunction<Student, Teacher, Tuple5<String, Integer, String, String, Long>>() { @Override public void processElement1(Student value, Context ctx, Collector<Tuple5<String, Integer, String, String, Long>> out) throws Exception { out.collect(new Tuple5<>(value.name, value.age, value.sex, value.classId, value.timestamp)); }
@Override public void processElement2(Teacher value, Context ctx, Collector<Tuple5<String, Integer, String, String, Long>> out) throws Exception { out.collect(new Tuple5<>(value.name, value.age, value.sex, value.classId, value.timestamp)); } }).print("process");
connect.map(new CoMapFunction<Student, Teacher, Tuple5<String, Integer, String, String, Long>>() { @Override public Tuple5<String, Integer, String, String, Long> map1(Student value) throws Exception { return new Tuple5<>(value.name, value.age, value.sex, value.classId, value.timestamp); }
@Override public Tuple5<String, Integer, String, String, Long> map2(Teacher value) throws Exception { return new Tuple5<>(value.name, value.age, value.sex, value.classId, value.timestamp); } }).print("map");
student.map(new MapFunction<Student, Tuple5<String, Integer, String, String, Long>>() { @Override public Tuple5<String, Integer, String, String, Long> map(Student value) throws Exception { return new Tuple5<>(value.name, value.age, value.sex, value.classId, value.timestamp); } }).union(teacher.map(new MapFunction<Teacher, Tuple5<String, Integer, String, String, Long>>() { @Override public Tuple5<String, Integer, String, String, Long> map(Teacher value) throws Exception { return new Tuple5<>(value.name, value.age, value.sex, value.classId, value.timestamp); } })).print("union");
sEnv.execute("ConnectOperator"); } }
|