# 电商大促,「网站实时成交额」仪表大盘技术方案?
作者:Tom哥
公众号:微观技术
博客:https://offercome.cn (opens new window)
人生理念:知道的越多,不知道的越多,努力去学
小T 同学前段时间入职一家电商公司,正赶上公司在搞一场大型促销活动,整个活动周期是 6月18日一整天,老板想看下网站的实时 GMV 和 客单价。
听说小T搞过大数据,于是把这个工作安排给了小 T。
如果你是小T, 该如何完成这项工作呢?
这是阿里每年双十一 的实时交易大屏,给人一种收获的感觉,也是向老板展示这一年成果的时刻。
离线计算能满足对大量数据 进行复杂的批量计算 ,但是要求数据在计算之前已经固定,不再发生变化。而且时间周期比较长,一般都是T+1模式
将 MySQL 数据库中前一天订单数据提取到离线仓库,然后通过 Hive SQL 执行一条跑批任务语句,即可计算出结果
实现起来确实很简单,但不满足老板的要求
既然要求实时
,那就不能采用离线数仓
玩法,我们需要换个思路
# 方案一(业务架构型)
可以采用常规业务架构 思路,通过 Kafka 完成系统解耦,通过开发消费任务 计算网站的交易额和客单价。
整个流程如下图所示,作为一个有多年 CRUD 开发经验 Boy,甚至都不用做技术方案,直接撸代码
统计任务可以用我们熟悉的 Java 代码来开发,其实就是一个普通的 MQ 异步消费任务。唯一有点挑战难度的就是统计去重后的下单客户总数
有的同学一拍脑袋,那还不简单,Set 集合
就可以呀
互联网的用户体量一般很大,如果上亿的数据量,Set 集合 并不友好
我们可以借鉴 布隆过滤器
的思想,定义一个 BitMap
的超长数组 ,不用担心会占用特别多内存,Tom哥做过实验,1千万的数据占用内存 1.19M,计算公式: 10000000/8/1024/1024 = 1.19M
BitMap 适用场景非常广泛,如:
我们希望记录自己网站上用户的上线频率,比如说,计算用户 A 上线了多少天,用户 B 上线了多少天,诸如此类,以此作为数据,从而决定让哪些用户参加 beta 测试等活动 —— 这个模式可以使用 SETBIT 和 BITCOUNT 来实现。
比如说,每当用户在某一天上线的时候,我们就使用 SETBIT ,以用户名作为 key ,将那天所代表的网站的上线日作为 offset 参数,并将这个 offset 上的为设置为 1 。
举个例子,如果今天是网站上线的第 100 天,而用户 peter 在今天阅览过网站,那么执行命令 SETBIT peter 100 1 ;如果明天 peter 也继续阅览网站,那么执行命令 SETBIT peter 101 1 ,以此类推。
当要计算 peter 总共以来的上线次数时,就使用 BITCOUNT 命令:执行 BITCOUNT peter ,得出的结果就是 peter 上线的总天数。
地址:http://doc.redisfans.com/string/bitcount.html
详细内容,可以看下之前文章:《什么是布隆过滤器?如何解决高并发缓存穿透问题?》 (opens new window)
方案一确实能满足工作要求,性能也不错,用的技术也是我们熟悉的。但是,勇于挑战的我们是不是应该有个更好的技术方案
# 方案二(流式计算)
现在大数据技术这么普及,我们可以使用流式计算
来统计数据,比如 Flink 框架
技术方案与上面的流程类似,区别在与任务采用了 Flink 框架,通过消息事件
驱动的方式来触发任务
# 1、环境配置
安装 Kafka、Redis 等中间件,之前的一篇文章有详细介绍
https://articles.zsxq.com/id_t9j3o8i3hb6z.html
创建一个订单 Topic,名字为 create_order_topic
,用来存储下单消息
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic create_order_topic_1
# 2、创建一个订单消息实体
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderMessage {
private Long userId; //用户id
private Long itemId; //商品id
private Double price;//订单金额
private Integer count;//购买数量
private Long time;
}
2
3
4
5
6
7
8
9
10
11
# 3、模拟生产端,往 Kafka 的 create_order_topic_1 主题投递消息
static List<OrderMessage> orderMessageList = Lists.newArrayList();
/**
* 初始化数据
*/
static {
orderMessageList.add(new OrderMessage(1L, 170170001L, 100d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170002L, 200d, 2, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170003L, 300d, 3, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170004L, 400d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170005L, 500d, 5, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170006L, 600d, 1, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170007L, 700d, 3, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(1L, 170170008L, 100d, 9, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170009L, 100d, 5, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(3L, 170170010L, 100d, 6, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170011L, 100d, 9, System.currentTimeMillis()));
orderMessageList.add(new OrderMessage(2L, 170170012L, 100d, 1, System.currentTimeMillis()));
}
/**
* 模拟生产订单数据
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (int i = 0; i < orderMessageList.size(); i++) {
ctx.collect(JSON.toJSONString(orderMessageList.get(i)));
//每 4 秒 产生一条数据
Thread.sleep(5000);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 4、Flink 任务端接收 Kafka 订单消息,并聚合处理
首先,添加绑定 Kafka 数据源,并订阅下单消息 topic
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//采用 Event-Time 来作为 时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置 Checkpoint 时间周期为 60 秒
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
env.setParallelism(1);
// 配置 kafka 参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(createOrderTopic, new SimpleStringSchema(), properties);
// 从最早开始消费
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
对接收的数据反序列化,转化为 OrderMessage
对象
// 数据反序列化
DataStream<OrderMessage> orderStream = stream.map(message -> {
//System.out.println(message);
return JSON.parseObject(message, OrderMessage.class);
});
2
3
4
5
6
中国时区, 统计区间每天的 0 点~ 24 点的数据进行计算
TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
生产环境,一天只输出一次计算结果不太合理,我们可以通过 trigger
设置触发器,以一定的频率输出计算结果
trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4))) 每隔 4 秒触发一次计算,并输出中间结果
通过 KeySelector
按天对数据分组,并且通过 evictor 剔除已经计算过的数据,避免大量的数据堆积在内存中,把内存撑爆
SingleOutputStreamOperator<Tuple3<String, String, String>> single = orderStream.keyBy(new KeySelector<OrderMessage, String>() {
@Override
public String getKey(OrderMessage value) throws Exception {
return timeStampToDate(value.getTime());
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(4)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new GMVProcessWindowFunctionBitMap());
2
3
4
5
6
7
8
9
10
11
12
CountEvictor 剔除有三种:
- CountEvictor:数量剔除器。在 Window 中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
- DeltaEvictor:阈值剔除器。计算 Window 中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。
- TimeEvictor:时间剔除器。保留 Window 中最近一段时间内的元素,并丢弃其余元素。
TimeEvictor.of(Time.seconds(0), true) 剔除已经计算过的数据元素,释放内存
系统交易额汇总计算,下过订单的用户 id (Long 类型) 放在 Roaring64NavigableMap
去重,中间计算结果通过 ValueState
来上下传递
public class GMVProcessWindowFunctionBitMap extends ProcessWindowFunction<OrderMessage, Tuple3<String, String, String>, String, TimeWindow> {
private transient ValueState<Double> gmvState;
private transient ValueState<Roaring64NavigableMap> bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
gmvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Double>("gmv", Double.class));
bitMapState = this.getRuntimeContext().getState(new ValueStateDescriptor("bitMap", TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {
})));
}
@Override
public void process(String s, Context context, Iterable<OrderMessage> elements, Collector<Tuple3<String, String, String>> out) throws Exception {
Double gmv = gmvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if (bitMap == null) {
bitMap = new Roaring64NavigableMap();
gmv = 0d;
}
Iterator<OrderMessage> iterator = elements.iterator();
while (iterator.hasNext()) {
OrderMessage orderMessage = iterator.next();
System.out.println("GMVProcessWindowFunctionBitMap = " + JSON.toJSONString(orderMessage));
gmv = gmv + orderMessage.getPrice();
Long userId = orderMessage.getUserId();
bitMap.add(userId);
gmvState.update(gmv);
bitMapState.update(bitMap);
}
out.collect(Tuple3.of(s, "userCount", String.valueOf(bitMap.getIntCardinality())));
out.collect(Tuple3.of(s, "gmv", String.valueOf(gmv)));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 源码地址
https://github.com/aalansehaiyang/xq_project