# 电商大促,「网站实时成交额」仪表大盘技术方案?

作者:Tom哥
公众号:微观技术
博客:https://offercome.cn (opens new window)
人生理念:知道的越多,不知道的越多,努力去学

小T 同学前段时间入职一家电商公司,正赶上公司在搞一场大型促销活动,整个活动周期是 6月18日一整天,老板想看下网站的实时 GMV 和 客单价。

听说小T搞过大数据,于是把这个工作安排给了小 T。

如果你是小T, 该如何完成这项工作呢?

这是阿里每年双十一 的实时交易大屏,给人一种收获的感觉,也是向老板展示这一年成果的时刻。

离线计算能满足对大量数据 进行复杂的批量计算 ,但是要求数据在计算之前已经固定,不再发生变化。而且时间周期比较长,一般都是T+1模式

将 MySQL 数据库中前一天订单数据提取到离线仓库,然后通过 Hive SQL 执行一条跑批任务语句,即可计算出结果

实现起来确实很简单,但不满足老板的要求

既然要求实时 ,那就不能采用离线数仓 玩法,我们需要换个思路

# 方案一(业务架构型)

可以采用常规业务架构 思路,通过 Kafka 完成系统解耦,通过开发消费任务 计算网站的交易额和客单价。

整个流程如下图所示,作为一个有多年 CRUD 开发经验 Boy,甚至都不用做技术方案,直接撸代码

统计任务可以用我们熟悉的 Java 代码来开发,其实就是一个普通的 MQ 异步消费任务。唯一有点挑战难度的就是统计去重后的下单客户总数

有的同学一拍脑袋,那还不简单,Set 集合 就可以呀

互联网的用户体量一般很大,如果上亿的数据量,Set 集合 并不友好

我们可以借鉴 布隆过滤器 的思想,定义一个 BitMap 的超长数组 ,不用担心会占用特别多内存,Tom哥做过实验,1千万的数据占用内存 1.19M,计算公式: 10000000/8/1024/1024 = 1.19M

BitMap 适用场景非常广泛,如:

我们希望记录自己网站上用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBIT 和 BITCOUNT 来实现。
比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1 。
举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。
当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。
地址:http://doc.redisfans.com/string/bitcount.html

详细内容,可以看下之前文章:《什么是布隆过滤器?如何解决高并发缓存穿透问题?》 (opens new window)

方案一确实能满足工作要求,性能也不错,用的技术也是我们熟悉的。但是,勇于挑战的我们是不是应该有个更好的技术方案

# 方案二(流式计算)

现在大数据技术这么普及,我们可以使用流式计算来统计数据,比如 Flink 框架

技术方案与上面的流程类似,区别在与任务采用了 Flink 框架,通过消息事件 驱动的方式来触发任务

# 1、环境配置

安装 Kafka、Redis 等中间件,之前的一篇文章有详细介绍

https://articles.zsxq.com/id_t9j3o8i3hb6z.html

创建一个订单 Topic,名字为 create_order_topic ,用来存储下单消息

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic_1

# 2、创建一个订单消息实体

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {

    private Long userId; //用户id
    private Long itemId; //商品id
    private Double price;//订单金额
    private Integer count;//购买数量
    private Long time;
}
1
2
3
4
5
6
7
8
9
10
11

# 3、模拟生产端,往 Kafka 的 create_order_topic_1 主题投递消息

static List<OrderMessage> orderMessageList = Lists.newArrayList();

    /**
     * 初始化数据
     */
    static {
        orderMessageList.add(new OrderMessage(1L, 170170001L, 100d, 1, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170002L, 200d, 2, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170003L, 300d, 3, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170004L, 400d, 1, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170005L, 500d, 5, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170006L, 600d, 1, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170007L, 700d, 3, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(1L, 170170008L, 100d, 9, System.currentTimeMillis()));

        orderMessageList.add(new OrderMessage(2L, 170170009L, 100d, 5, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(3L, 170170010L, 100d, 6, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(2L, 170170011L, 100d, 9, System.currentTimeMillis()));
        orderMessageList.add(new OrderMessage(2L, 170170012L, 100d, 1, System.currentTimeMillis()));

    }


    /**
     * 模拟生产订单数据
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {

        for (int i = 0; i < orderMessageList.size(); i++) {
            ctx.collect(JSON.toJSONString(orderMessageList.get(i)));
            //每 4 秒 产生一条数据
            Thread.sleep(5000);
        }
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

首先,添加绑定 Kafka 数据源,并订阅下单消息 topic

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//采用 Event-Time 来作为 时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置 Checkpoint 时间周期为 60 秒
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
env.setParallelism(1);

// 配置 kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromLatest();

DataStream<String> stream = env.addSource(consumer);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

对接收的数据反序列化,转化为 OrderMessage 对象

// 数据反序列化
DataStream<OrderMessage> orderStream = stream.map(message -> {
    //System.out.println(message);
    return JSON.parseObject(message, OrderMessage.class);
});

1
2
3
4
5
6

中国时区, 统计区间每天的 0 点~ 24 点的数据进行计算

TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))

生产环境,一天只输出一次计算结果不太合理,我们可以通过 trigger 设置触发器,以一定的频率输出计算结果

trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4))) 每隔 4 秒触发一次计算,并输出中间结果

通过 KeySelector 按天对数据分组,并且通过 evictor 剔除已经计算过的数据,避免大量的数据堆积在内存中,把内存撑爆

SingleOutputStreamOperator<Tuple3<String, String, String>> single = orderStream.keyBy(new KeySelector<OrderMessage, String>() {
    @Override
    public String getKey(OrderMessage value) throws Exception {
        return timeStampToDate(value.getTime());
    }
})
        .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4)))
        .evictor(TimeEvictor.of(Time.seconds(0), true))
        .process(new GMVProcessWindowFunctionBitMap());


1
2
3
4
5
6
7
8
9
10
11
12

CountEvictor 剔除有三种:

    1. CountEvictor:数量剔除器。在 Window 中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
    1. DeltaEvictor:阈值剔除器。计算 Window 中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。
    1. TimeEvictor:时间剔除器。保留 Window 中最近一段时间内的元素,并丢弃其余元素。

TimeEvictor.of(Time.seconds(0), true) 剔除已经计算过的数据元素,释放内存

系统交易额汇总计算,下过订单的用户 id (Long 类型) 放在 Roaring64NavigableMap 去重,中间计算结果通过 ValueState 来上下传递

public class GMVProcessWindowFunctionBitMap extends ProcessWindowFunction<OrderMessage, Tuple3<String, String, String>, String, TimeWindow> {
    private transient ValueState<Double> gmvState;
    private transient ValueState<Roaring64NavigableMap> bitMapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        gmvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Double>("gmv", Double.class));
        bitMapState = this.getRuntimeContext().getState(new ValueStateDescriptor("bitMap", TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {
        })));
    }

    @Override
    public void process(String s, Context context, Iterable<OrderMessage> elements, Collector<Tuple3<String, String, String>> out) throws Exception {
        Double gmv = gmvState.value();
        Roaring64NavigableMap bitMap = bitMapState.value();
        if (bitMap == null) {
            bitMap = new Roaring64NavigableMap();
            gmv = 0d;
        }

        Iterator<OrderMessage> iterator = elements.iterator();
        while (iterator.hasNext()) {
            OrderMessage orderMessage = iterator.next();
            System.out.println("GMVProcessWindowFunctionBitMap = " + JSON.toJSONString(orderMessage));
            gmv = gmv + orderMessage.getPrice();
            Long userId = orderMessage.getUserId();
            bitMap.add(userId);

            gmvState.update(gmv);
            bitMapState.update(bitMap);
        }
        out.collect(Tuple3.of(s, "userCount", String.valueOf(bitMap.getIntCardinality())));
        out.collect(Tuple3.of(s, "gmv", String.valueOf(gmv)));

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 源码地址

https://github.com/aalansehaiyang/xq_project

上次更新: 2023/3/7