2017-05-19
Streaming 中 Receiver 相关源码分析

本文基于 spark 1.6.2
本次的源码全来自 org.apache.spark.streaming.receiver 这个 package 下,包括 BlockGenerator.scala, RateLimiter, ReceiverdBlock.scala, ReceivedBlockHandler.scala, Receiver.scala, ReceiverSupervisor.scala, ReceiverSupervisorImpl.scala

Read More

2017-02-16
Spark Streaming 统一在每分钟的 00 秒消费 Kafka 数据的问题解决

现象

一批 Spark Streaming 会统一在每分钟的 00 秒开始消费 kafka 数据

Read More

2016-12-02
Spark Streaming 从指定时间戳开始消费 kafka 数据

需求

从指定时间戳(比如 2 小时)开始消费 Kafka 数据

Read More

2016-11-26
Spark Streaming 往 HDFS 写文件,自定义文件名

需求

将 kafka 上的数据实时同步到 HDFS,不能有太多小文件

Read More

2016-11-01
Spark Streaming 自适应上游 kafka topic partition 数目变化

背景

Spark Streaming 作业在运行过程中,上游 topic 增加 partition 数目从 A 增加到 B,会造成作业丢失数据,因为该作业只从 topic 中读取了原来的 A 个 partition 的数据,新增的 B-A 个 partition 的数据会被忽略掉。

Read More

2016-10-22
Storm 的可靠性保证测试

文章首发于 美团点评技术博客

Storm 是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。

在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。

在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。

Storm 的消息保证机制

Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。

消息完全处理

每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:

1
2
3
4
5
6
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum);
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));

这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:

Tuple 树示例图

上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。

如何实现不同层次的消息保证机制

spout_bolt_acker

Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。

Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。

Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:

  • 关闭 ACK 机制,即 Acker 数目设置为 0
  • Spout 不实现可靠性传输

  • Spout 发送消息是使用不带 message ID 的 API

  • 不实现 fail 函数
  • Bolt 不把处理成功或失败的消息发送给 Acker

如果需要实现 At Least Once 语义,则需要同时保证如下几条:

  • 开启 ACK 机制,即 Acker 数目大于 0
  • Spout 实现可靠性传输保证
  • Spout 发送消息时附带 message 的 ID
  • 如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
  • Bolt 在处理成功或失败后需要调用相应的方法通知 Acker
    实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。

下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个 B ,下游收到两个 B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。

三种消息保证机制比较图

测试目的

Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:

  • 三种消息保证机制的表现,是否与官方的描述相符;
  • At Most Once 语义下,消息的丢失率和什么有关系、关系如何;
  • At Least Once 语义下,消息的重复率和什么有关系、关系如何。

测试环境

本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker 分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。

三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计,如下图所示:

测试流程示意图

测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。

测试场景

对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。

At Most Once

从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。

输入数据

保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。

测试结果














































异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
050000050000000%0
01000000100000000%0
02000000200000000%0
03000000300000000%0














































异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量
1300000027749402250607.50%0
23000000230708769291323.09%0
33000000208282391717730.57%0
430000001420725157927552.64%0

结论

不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目异常次数正相关。与官方文档描述相符,符合预期。

At Least Once

为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。

测试数据

Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。

测试结果

Acker 发生异常的情况



















































异常的次数 测试数据总量 结果集中不重复的 Tuple 数 数据重复的次数(>1) 出现重复的 Tuple 数 数据丢失数量 最大积压量
0 100000 100000 - - 0 2000(默认值)
0 200000 200000 - - 0 2000
0 300000 300000 - - 0 2000
0 400000 400000 - - 0 2000



















































异常的次数 测试数据总量 结果集中不重复的 Tuple 数 数据重复的次数(>1) 出现重复的 Tuple 数 数据丢失数量 最大积压量
1 100000 100000 2 2000 0 2000
2 100000 100000 2 4001 0 2000
3 100000 100000 2 6000 0 2000
4 100000 100000 2 8000 0 2000

Spout 发生异常的情况













































异常的次数 测试数据总量 结果集中不重复的 Tuple 数 数据重复的次数(>1) 出现重复的 Tuple 数 数据丢失数量
0 100000 100000 - - 0
0 200000 200000 - - 0
0 300000 300000 - - 0
0 400000 400000 - - 0






















































异常的次数 测试数据总量 结果集中不重复的 Tuple 数 数据重复的次数(>1) 出现重复的 Tuple 数 数据丢失数量
1 100000 100000 2 2052 0
2 100000 100000 2 4414 0
4 100000 100000 2 9008 0
6 100000 100000 2 9690 0
3 1675 0

Bolt 发生异常的情况

调用 emit 函数之前发生异常









































异常次数 结果集中不重复的 Tuple 数 数据重复的次数 (>1) 出现重复的 Tuple 数 数据丢失量
0 100000 - - 0
0 200000 - - 0
0 300000 - - 0
0 400000 - - 0
















































异常次数 结果集中不重复的 Tuple 数 数据重复的次数 (>1) 出现重复的 Tuple 数 数据丢失量
1 100000 - - 0
2 100000 - - 0
4 100000 - - 0
8 100000 - - 0
10 100000 - - 0

调用 emit 函数之后发生异常








































异常次数 结果集中不重复的 Tuple 数 数据重复的次数(>1) 出现重复的 Tuple 数 数据丢失数量
0 100000 - - 0
0 200000 - - 0
0 300000 - - 0
0 400000 - - 0
















































异常次数 结果集中不重复的 Tuple 数 数据重复的次数(>1) 出现重复的 Tuple 数 数据丢失数量
1 100000 2 2 0
2 100000 2 3 0
4 100000 2 5 0
8 100000 2 9 0
10 100000 2 11 0

结论

从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。

  • 不发生任何异常的情况下,消息不会重复不会丢失。
  • Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。
  • Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。
  • Bolt 发生异常的情况:
  • emit 之前发生异常,消息不会重复。
  • emit 之后发生异常,消息重复的次数等于异常的次数。
    结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。

Exactly Once

对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。

测试数据

Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。

测试结果

Spout 发生异常情况






























异常数 测试数据量 结果集中不重复的 Tuple 数 结果集中所有 Tuple 的总和
1 10000 10000 50005000
2 10000 10000 50005000
3 10000 10000 50005000

Acker 发生异常的情况





























异常数 测试数据量 结果集中不重复的 Tuple 数 结果集中所有 Tuple 的总和
1 10000 10000 50005000
2 10000 10000 50005000
3 10000 10000 50005000

Bolt 发生异常的情况





























异常数 测试数据量 结果集中不重复的 Tuple 数 结果集中所有 Tuple 的总和
1 10000 10000 50005000
2 10000 10000 50005000
3 10000 10000 50005000

结论

在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。

总结

对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。

不同消息可靠性保证的使用场景

对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:






























可靠性保证层次优点缺点使用场景
At most once 处理速度快 数据可能丢失 都处理速度要求高,且对数据丢失容忍度高的场景
At least once 数据不会丢失 数据可能重复 不能容忍数据丢失,可以容忍数据重复的场景
Exactly once 数据不会丢失,不会重复 处理速度慢 对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额

如何实现不同层次的消息可靠性保证

对于 At Least Once 的保证需要做如下几步:

  1. 需要开启 ACK 机制,即 Topology 中的 Acker 数量大于零;
  2. Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId,并且实现失败消息重传功能(fail 函数 ,可以参考下面的 Spout 代码);
  3. Bolt 在发送消息时,需要调用 emit(inputTuple, outputTuple)进行建立 anchor 树(参考下面建立 anchor 树的代码),并且在成功处理之后调用 ack ,处理失败时调用 fail 函数,通知 Acker。

不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。

不同层次的可靠性保证如何实现

如何实现可靠的 Spout

实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 想实现可靠的 Spout,需要实现如下两点
* 1\. 在 nextTuple 函数中调用 emit 函数时需要带一个 msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
* 2\. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)
*/
public void nextTuple() {
//设置 msgId 和 Value 一样,方便 fail 之后重发
collector.emit(new Values(curNum + "", round + ""), curNum + ":" + round);
}
@Override
public void fail(Object msgId) {//消息发送失败时的回调函数
String tmp = (String)msgId; //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
String[] args = tmp.split(":");
//消息进行重发
collector.emit(new Values(args[0], args[1]), msgId);
}

如何实现可靠的 Bolt

Storm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//BaseRichBolt 实现不可靠消息传递
public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(new Values(word)); // 不建立 anchor 树
}
_collector.ack(tuple); //手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//BaseRichBolt 实现可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word)); // 建立 anchor 树
}
_collector.ack(tuple); //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
下面的示例会可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList&lt;Tuple&gt;();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
//BaseBasicBolt 是吸纳可靠的消息传递
public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

Trident

在 Trident 中,Spout 和 State 分别有三种状态,如下图所示:

Trident Spout 和 State 的状态图

其中表格中的 Yes 表示相应的 Spout 和 State 组合可以实现 Exactly Once 语义,No 表示相应的 Spout 和 State 组合不保证 Exactly Once 语义。下面的代码是一个 Trident 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); //Opaque Spout
//TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf); //Transaction Spout
TridentTopology topology = new TridentTopology();
String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);
Stream stream = topology.newStream(spoutTxid, spout)
.name("new stream")
.parallelismHint(1);
// kafka config
KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(); //KafkaProducerConfig 仅对 kafka 相关配置进行了封装,具体可以参考 TridentKafkaStateFactory2(Map<String, String> config)
Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);
TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper(); //TridentToKafkaMapper 继承自 TridentTupleToKafkaMapper<String, String>,实现 getMessageFromTuple 接口,该接口中返回 tridentTuple.getString(0);
String dstTopic = "test__topic_for_all";
TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);
stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);
stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));
stream.each(new Fields("bytes"), new AddMarkFunction(), new Fields("word")) //从spout 出来数据是一个 bytes 类型的数据,第二个是参数是自己的处理函数,第三个参数是处理函数的输出字段
.name("write2kafka")
.partitionPersist(stateFactory //将数据写入到 Kafka 中,可以保证写入到 Kafka 的数据是 exactly once 的
, new Fields("word")
, new TridentKafkaUpdater())
.parallelismHint(1);

关注我们的官方微信公众号“美团点评技术团队”。现在就拿出手机,扫一扫:

公众号二维码

2016-08-27
Spark Streaming 从 Kafka 读取 binlog 转换成 Json

在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果

如果本文有讲述不详细,或者错误指出,肯请指出,谢谢

对于 binlog 数据,每一次操作(INSERT/UPDATE/DELETE 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)

在数据库中,有 id, name,age 三个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下

id name age
1 john 30
2 john 40

那么你进行操作

update table set age = 50 where name = “john”

的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。

下面,我们给出具体的代码,然后对代码进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def desirializeByte(b: (String, Array[Byte])) : (String, String) = {
val binlogEntry = BinlogEntryUtil.serializeToBean(b._2) //将 Array[Byte] 数据转换成 com.meituan.data.binlog.BinlogEntry 类,相关类定义参考附录
val pkeys = binlogEntry.getPrimaryKeys.asScala //获取主键,这里的 asScala 将 Java 的 List 转换为 Scala 的 List
val rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList //获取具体的信息
val strRowDatas = rowDatas.map(a => { //将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面进行 Json 化
val b = a.getBeforeColumns.asScala //获取 beforColumns
val c = a.getAfterColumns.asScala //获取 afterColumns
val mb = b.map(d => (d._1, d._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值
val mc = c.map(c => (c._1, c._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值
(mb, mc) //返回转换后的 beforeColumns 和 afterColumns
})
//下面利用 json4s 进行 Json 化
(binlogEntry.getEventType, compact("rowdata" -> strRowDatas.map{
w => List("row_data" -&gt; ("before" -&gt; w._1.toMap) ~ ("after" -&gt; w._2.toMap)) //这里的两个 toMap 是必要的,不然里层会变成 List,这个地方比较疑惑的是,
//w._1 按理是 Map类型,为什么还需要强制转换成 Map
//而且用 strRowDatas.foreach(x => println(s"${x._1} ${x._2}")打印的结果表名是 Map
}))</pre>
desirializeByte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (String, String) 的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete 等),第二个字段为 Json 化后的数据。
BinlogEntryUtil.serilizeToBean 是一个辅助类,将 binlog 数据转化为一个 Java bean 类。
第 4 行,我们得到表对应的主键,第 5 行获得具体的数据
第 6 行到第 12 行是 Json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。
第 14, 15 行就是具体的 Json 工作了(使用了 json4s 包进行 Json 化)
这个过程中有一点需要注意的是,在 Json 化的时候,记得为 w._1 和 w._2 加 toMap 操作,不然会变成 List(很奇怪,我将 w._1 和 w._2 打印出来看,都是 Map 类型)或者你可以在第 7,8 行的末尾加上 .toMap 操作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。
利用上述代码,我们可以得到下面这样 Json 化之后的字符串(我进行了排版,程序返回的 Json 串是不换行的)
<pre class="font-size:8 lang:default decode:true">{"rowdata":
[{"row_data":
{"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},
"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}
}
}]
}

到这里,基本就完成了一种将 binlog 数据 Json 化的代码。

附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public static BinlogEntry serializeToBean(byte[] input) {
BinlogEntry binlogEntry = null;
Entry entry = deserializeFromProtoBuf(input);//从 protobuf 反序列化
if(entry != null) {
binlogEntry = serializeToBean(entry);
}
return binlogEntry;
}
public static Entry deserializeFromProtoBuf(byte[] input) {
Entry entry = null;
try {
entry = Entry.parseFrom(input);
//com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成
} catch (InvalidProtocolBufferException var3) {
logger.error("Exception:" + var3);
}
return entry;
}
//将 Entry 解析为一个 bean 类
public static BinlogEntry serializeToBean(Entry entry) {
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception var8) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);
}
BinlogEntry binlogEntry = new BinlogEntry();
String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");
String logFileNo = "000000";
if(logFileNames.length > 1) {
logFileNo = logFileNames[1];
}
binlogEntry.setBinlogFileName(logFileNo);
binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());
binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());
binlogEntry.setTableName(entry.getHeader().getTableName());
binlogEntry.setEventType(entry.getHeader().getEventType().toString());
Iterator primaryKeysList = rowChange.getRowDatasList().iterator();
while(primaryKeysList.hasNext()) {
RowData rowData = (RowData)primaryKeysList.next();
BinlogRow row = new BinlogRow(binlogEntry.getEventType());
row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));
row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));
binlogEntry.addRowData(row);
}
if(binlogEntry.getRowDatas().size() >= 1) {
BinlogRow primaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);
binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));
} else {
ArrayList primaryKeysList2 = new ArrayList();
binlogEntry.setPrimaryKeys(primaryKeysList2);
}
return binlogEntry;
}
public class BinlogEntry implements Serializable {
private String binlogFileName;
private long binlogOffset;
private long executeTime;
private String tableName;
private String eventType;
private List<String> primaryKeys;
private List<BinlogRow> rowDatas = new ArrayList();
}
public class BinlogRow implements Serializable {
public static final String EVENT_TYPE_INSERT = "INSERT";
public static final String EVENT_TYPE_UPDATE = "UPDATE";
public static final String EVENT_TYPE_DELETE = "DELETE";
private String eventType;
private Map<String, BinlogColumn> beforeColumns;
private Map<String, BinlogColumn> afterColumns;
}
public class BinlogColumn implements Serializable {
private int index;
private String mysqlType;
private String name;
private boolean isKey;
private boolean updated;
private boolean isNull;
private String value;
}

 

2016-03-30
MIT 6.824 Lab 2 Part A

做这个实验,最能学习的地方就是调试和思考的过程了,如果你直接参考了别人的思路或者代码,那么对于你来说,这个实验能学习到的东西则会大大减少

记录 MIT 6.824 Lab 2 中 Part A的一些想法以及思路,如果错误,还请指出,谢谢

Lab 2 的链接如下http://nil.csail.mit.edu/6.824/2015/labs/lab-2.html,其中 Part A 要求实现一个 ViewService,根据 Server的状态,进行相应的 View 切换(这里 View 表示当前能提供服务的 Server 以及相应的状态组合,ViewService 提供 View 的增删改查功能),这里将该 Lab 的两个部分分开来写。

Part A 实现 ViewService 的整个功能,ViewService 需要保证如下几点:

  1. 在以下几种情况中的任何一种发生的时候才进行 View 的切换

    1. primary 和 backup 都没有 ack
    2. primary 或者 backup 重启
    3. backup 为空,且有闲置的 Server(会发送 Ping 命令给 ViewService)
  2. 只有在 primary ack 过了当前的 View 之后,才能进行 View 的切换,换句话说,如果 primary 收到一个新的 View,然后挂了,那么 ViewService 就不应该切换 View(根据页面的介绍,必须 ack 当前 View,那么这里有一个问题,如果 primary 收到一个新的 View,然后重启了,这种情况做何处理?),这个限制简化了架构以及实现,但是可能导致一直不能更换 View

  3. 如果 primary 或 backup 在约定好的时间内没有发送 Ping 命令,则认为该 Server 挂了,需要做相应的操作
  4. View 的 primary 只能是当前 View 的 primary 或者前一个 View 的 backup(在 ViewService 初始化的时候,primary 是第一个连接进来的 Server)
  5. View 的 backup 可以是除 primary 之外的任何 Server,可为空
    Part A 的要求实现如下三个函数:
    func (vs ViewServer) Ping(args PingArgs, reply PingReply) error {}
    func (vs
    ViewServer) Get(args GetArgs, reply GetReply) error {}
    func (vs *ViewServer) tick() {}

    其中 Ping 接受 Server 发送过来的信息,并更新 View 的相应情况,Get 获取当前的 View,tick 则是一个回调函数,在固定时间内调用一次,检查 primary 和 backup 是否已经宕机,这里我实现的 Get 很简单,直接返回当前 View(在 ViewServer 里面定义一个字段 curView 用来表示当前 View),其他两个才是重点

先把我定义的 ViewServer 贴一下(这个应该不算贴代码吧),下面能够更好的进行描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type ViewServer struct {
mu sync.Mutex
l net.Listener
dead int32 // for testing
rpccount int32 // for testing
me string
// Your declarations here.
lastPing map[string]time.Time //记录 server 上次请求的时间,用来判断是否宕机
curView View //当前 View
hasView bool //当前是否有 View 存在
hasAcked bool //Primary 是否已经 ack 了当前 View
secondBackup string //将要被提升为 backup 的server
}

先说 tick,在 tick 中,首先我们需要知道 primary 是否已经 ack 了当前 View,如果没有 ack,那么就直接返回即可,如果 ack 过了当前 View,那么就继续进行下面的操作(下面的操作必须在 primary ack 过了当前 View 之后才能进行

  1. 判断 Primary 是否超时

    1. 如果超时,则判断当前 View 是否存在 backup

      1. 存在 backup,则将 backup 提升为 primary,然后将 primary 从 lastPing 中删除,并且将 hasAcked 置为 false
      2. 不存在,将 hasView 置为 false
    2. 不超时,不做操作

  2. 判断 backup 是否超时

    1. 超时,则将 curView 中的 backup 置为 “”,然后 hasAcked 置为 false
    2. 不超时,不做操作
      然后接下来是 Ping 函数
  3. 判断当前是否有 View(通过 hasView)

    1. 没有,就将当前发送 Ping 的 Server 当成 Primary,然后返回
    2. 有当前 View

      1. 发送 Ping 的 Server 是 什么角色?

        1. 是 primary,考虑 primary 是否重启

          1. 重启(通过 ping 命令 请求参数是否为 0 判断),则判断当前 View 是否有 backup

            1. 有,将 backup 提升为 primary,然后将 primary 设置成 secondBackup(会在下次请求的时候加入到 View 中),这样的实现,是否合理,是否需要直接将 primary 设置为 backup?
            2. 没有,则将当前 View 的 Viewnum 加 1 即可
          2. 不是重启,则 ack 当前 View(请求参数可能是 当前 View 的 Viewnum 和 0 之外的第三个值吗?)

            1. backup,考虑是否重启

              1. 重启,将 backup 的角色切换到 secondBackup
              2. 不是重启,则不做操作
            2. 闲置的 Server,考虑当前 View 是否有 backup

              1. 有,不做操作(这里是否需要将当前 Server 加入到 secondBackup?
              2. 没有,则将当前 Server 加入到 secondBackup,等待下一次 primary 发送 ping 的时候,提升为 backup

当然,Ping 和 tick 函数 需要考虑加锁的问题,如果只为了通过测试,可以不加锁,测试都是串行的请求(有 goroutine),如果不加锁,可能会遇到很诡异的问题

思路整理之后发现也不是太难,不过过程中还是有不少细节需要注意,如果可以,最好是自己进行思考,然后不断的调试,通过打印日志,思考是否符合自己的理解,然后进行代码的调整

 

2016-03-23
MIT 6.824 2015 Lab 1 记录

===========本文需要有 Go 的基础,并且知道 6.824 Lab 的相关内容作为预备知识===========

最开始做这个 Lab 是去年,所以使用的是 2015 年的(现在已经有 2016 年的了),地址Distributed System

第一个 Lab 是阅读 MapReduce 的论文,然后在提供的框架下实现一个简单版的 MapReduce 程序,论文地址:MapReduce

Part I

在提供的框架下,自己实现 Map 和 Reduce 函数,从而实现单机版的 MapReduce 程序,用来统计单词的数据,类似分布式程序的 Hello World。

根据提供的代码,以及我们在 Part I 执行的语句可知,在 wc.go 中我们找到如下的语句

mapreduce.RunSingle(5, 3, os.Args[2], Map, Reduce)

我们最终执行的是 mapreduce.RunSingle 这个函数,在 RunSingle 函数中,可以分为如下几步

  1. InitMapReduce
  2. Split
  3. DoMap
  4. DoReduce
    其中 InitMapReduce,初始化一个 mapreduce 结构体,在后面使用,Split 则将输入的文件进行,然后顺序调用 DoMap,这里面会调用我们写的 map 函数,DoMap 都做完之后,再继续执行 DoReduce,这个函数会调用我们写的 reduce 函数。然后根据论文中的伪代码,差不多就可以完成这两个函数了
    map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
    EmitIntermediate(w, “1”);
    reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
    result += ParseInt(v);
    Emit(AsString(result));

    Part IIIII

首先查看 test_test.go 中的所有的 test 函数,看是如何实现测试的,大致顺序会形成一张如下的图,从上到下形成调用间的层次,同一层次间的函数执行顺序是从左往右顺序执行,其中绿色的表示是通过 go func()(另起一个线程)来执行的,我们只需要完成 mapreduce.go#RunMaster 函数即可。

part2

从 test 中的代码以及 Part I 中相关代码可以得知,我们需要写的代码(RunMaster 函数),实际上就是把所有的任务(map 或者 reduce)分配给具体的 worker 来执行。

首先,如果我们不考虑 worker 这个概念,那么怎么实现 RunMaster 函数呢,我们只需要把 Part I 中 RunSingle 中的两个 for 循环改成 goroutine 的,也就是在函数 DoMap 和 DoReduce 之前加关键字 go 即可,当然到这里我们还需要考虑,如何做到所有的 map 都完成之后才处理 reduce?reduce 都处理完成才算 RunMaster 函数处理完成?这就变成了 goroutine 的的同步问题了,可以参考 channel buffering。

到这里,如果我们不考虑 worker 的话,所有的 test case 都可以通过了,但是发现 TestBasic 函数的起的 worker 我们根本没有用到(后面几个 test case 还有 worker fail 的情况),那么就变成了,如何将我们上面的代码改写为,使用 worker 来执行,而不是直接通过 go DoMap() 以及 go DoReduce 来执行,通过阅读 worker.go 发现有一个 RPC 接口 DoJob,刚好满足我们的需要,阅读整个项目的其他代码(mapreduce.go#CleanupRegistration()),发现通过调用 common.go#call() 来统一进行 RPC 调用.

这里我们需要知道,从哪知道一个 worker 准备就绪,以及如何知道一个 woker 从忙状态(处理任务)—> 闲状态(任务处理完成),我们可以看到在 worker.go#RunWoker 里面有一句

Register(MasterAddress, me)

我们发现 Register 函数如下

// Tell the master we exist and ready to work
func Register(master string, me string) {
args := RegisterArgs{}
args.Worker = me
var reply RegisterReply
ok := call(master, “MapReduce.Register”, args, reply)
if ok == false {
fmt.Printf(“Register: RPC %s register error\n”, master)
}
}

其中第6行调用 MapReduce.Register 这个 RPC 接口,继续看,发现 mapreduce.go#Register 这个函数中有下面一句话

mr.registerChannel <- args.Worker

发现 registerChannel 是 mapreduce 这个结构体中的一个 channel,也就是在 RunWorker 的时候,我们能从 mr.registerChannel 得到一个标识 worker 的字符串(可以理解为这个 worker 的名字),而这个字符串,后续我们传给 common.go#call 函数,调用相关的 RPC 接口。

好,至少我们知道什么时候会得到通知有 worker 注册了,那么如何知道 worker 从忙变成闲呢,通过上面的流程,我们可以复用 registerChannel,也就是如果一个 worker 处理完任务的时候,我们也往这个 channel 发送 args.Worker 这个字段,这里就需要更改 registerChannel 的定义,因为我们不知道注册 worker 和分配任务给 worker 谁先谁后,在这里我们只需要把 registerChannel 变成带 buffer 的就行了。最终需要处理 worker 中途挂掉的情况,只需要在外层起一个死循环,直到 call 这个 函数返回 true 的时候才退出即可。

总结:

梳理一下:我们在 RunMaster 中需要并行的执行 Map,所有 Map 操作执行完成之后,并行的执行 Reduce 操作,这些操作需要通过分配给 worker 来执行,通过 channel 可以知道什么时候有空闲的 worker(注册或者由忙变闲),然后在调用 Worker.DoJob 的外层用死循环包装一层,知道 RPC 返回成功才退出即可