头歌 Flink Transformation(数据转换入门篇)

1.map实现数据清洗

package demo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.api.common.functions.MapFunction;

import java.text.SimpleDateFormat;

public class Step1 {

public static DataStream<String> mapTest(DataStreamSource<String> dataStreamSource) {
    SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws Exception {
            String[] fields = value.split(",");
            String userId = fields[0];
            String gender = fields[1].equals("0") ? "男" : "女";
            Long timestamp = Long.parseLong(fields[2]);
            String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(timestamp);
            return userId + "," + gender + "," + dateString;
        }
    });
    return result;
}
}

2.flatmap完成单词切割

package demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.util.Collector;

public class Step2 {

    public static DataStream<String> FlatMapTest(DataStreamSource<String> dataStreamSource) {
        DataStream<String> result = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split("\\s+");
                for (String word : words) {
                    out.collect(word);
                }
            }
        }).returns(Types.STRING);
        return result;
    }
}
 

3. filter完成数据清洗之过滤

package demo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

public class Step3 {

    /**
     * 流数据格式如下:
     * 17,女,2016-02-21 20:21:17 ---> 用户ID,用户性别,该用户在平台注册账号的时间戳
     * 需求如下:
     * 过滤掉注册年份低于2015(不包含2015)的数据
     *
     * @param dataStreamSource 流数据源
     * @return DataStream<String>
     */
    public static DataStream<String> FilterTest(DataStreamSource<String> dataStreamSource) {
        DataStream<String> filteredDataStream = dataStreamSource.filter(line -> {
            // 解析时间戳,获取注册年份
            String[] parts = line.split(",");
            String timestamp = parts[2];
            int registrationYear = Integer.parseInt(timestamp.split("-")[0]);
            // 过滤出注册年份在2015年之后的数据
            return registrationYear > 2015;
        });
        return filteredDataStream;
    }
}
 

4.reduce 完成累加操作与求最大值操作

package demo;

import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.ReduceOperator;

public class Step4 {

    /**
     * 需求:使用reduce,进行累加操作
     *
     * @param ds DataSource<Integer> (1,2,3,4,.....)
     * @return ReduceOperator<Integer>
     */
    public static ReduceOperator<Integer> ReduceTest(DataSource<Integer> ds) {
        // 使用reduce操作进行累加
        ReduceOperator<Integer> result = ds.reduce((value1, value2) -> value1 + value2);
        return result;
    }

    /**
     * 需求:使用reduce,得出最大值
     *
     * @param ds DataSource<Integer> (1,2,3,4,.....)
     * @return ReduceOperator<Integer>
     */
    public static ReduceOperator<Integer> ReduceTest2(DataSource<Integer> ds) {
        // 使用reduce操作得出最大值
        ReduceOperator<Integer> result = ds.reduce((value1, value2) -> Math.max(value1, value2));
        return result;
    }
}
 

5.综合案例 - 词频统计

package demo;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class Step5 {

    /**
     * 需求:使用flatMap、groupBy、sum 等算子完成单词统计
     *
     * @param ds DataSource<String>  数据格式如下:
     *           area book business
     *           case child company country day eye
     * @return AggregateOperator<Tuple2<String, Integer>>
     */
    public static AggregateOperator<Tuple2<String, Integer>> WordCountTest(DataSource<String> ds) {
        // 使用flatMap将每行字符串拆分成单词并转换成元组
        AggregateOperator<Tuple2<String, Integer>> result = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
            for (String word : line.split("\\s")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT))
          // 使用groupBy对单词进行分组
          .groupBy(0)
          // 使用sum进行求和
          .sum(1);

        return result;
    }
}