-
Twitter 新一代流处理利器——heron 论文笔记之storm limitations
19/08/2015
Twitter 新一代流处理利器——Heron 论文笔记之Storm Limitations
标签(空格分隔): Streaming-Processing
## Storm Problems scalability, debug-ability, manageability, and efficient sharing of cluster resources with other data services。
Storm Worker Architecture: Limitations
- Storm的worker就是一个JVM进程,每个worker可以跑多个executor,目前根据Storm现有的调度机制,我们无法确定那个task被分配到了哪个worker上,哪台物理机器上。
- 由于不知道task被分配到哪个worker上,有可能是同一个,考虑join的情况,一个join task和一个output 到 DB Store或其他存储的task被分配到同一个worker,这样性能可能无法保证
- 当前正在跑的topology如果重启的话,之前分派在同一个worker的task由于toplogy重启,可不能不会再被分配到同一个worker上,这给debug带来了困难。
- Storm 提供自己实现的isolate 调度,但是要交于开发人员来分配集群资源是个及其不好的做法。
- 资源分配浪费。Storm假设每个worker都是homogenous,这种做法经常会造成在资源预的超额分配。例如3个spouts和1个bolt,加入每个spout和bolt各自需要5G和10G内存,这样的话,topoogy必须为每个worker预留15G的内存来跑一个spout和一个bolt,如果用户设置worker数为2,那么两个worker就要总共预留30G内存,但是实际上只需要 3*5 + 1 *10 = 25G内存,这样就浪费了5G。
- 如果对一个worker进行heap dump时,可能会阻塞worker hearbeats的发送,导致supervisor认为该worker心跳超时,kill 和重启了该worker
- worker用thread和queue来做tuple的接收和发送,每个worker有一个receive-thread接收上游tuple,一个全局send-thread负责往下游发送tuple,然后executor有一个logic-thread来执行用户的代码逻辑,最后有一个本地的send-thread来做logic-thread和全局send-thread做数据通信,到这里,一个tuple需要从进入一个worker到出来总共要通过4个thread转发。
Issues with the Storm Nimbus
Storm的NImbus任务很多很艰巨,包括调度,监听,分发JAR等等,topology多的时候,Nimbus将变成瓶颈。 > 1. Nimbus调度器不支持worker细粒度的resource reservation和isolation。不同topology的worker被分配到了同一个物理node上,很有可能会相互影响。 > 2. Storm利用Zookeeper来存储worker和supervisor以及executor的心跳信息。如果topology很多,每个topology的并发很多,这样Zookeeper就是瓶颈。 > 3. 就是老生常谈的nimbus单点故障,Nimbus不是HA。
Lack of Backpressure
Storm没有backpressure机制,如果下游接收数据的component没有及时处理数据的话,发送者就会drop message。这是一种fail-fast机制,也很简单,但是有以下缺点:
- If acknowledgements are disabled, this mechanism will resultin unbounded tuple drops, making it hard to get visibility about these drops.
- Work done by upstream components is lost.
- System behavior becomes less predictable.
Efficiency
- Suboptimal replays
- Long Garbage Collection cycles
- Queue contention
未完待续,下次讲述Twitter的新利器——Heron的架构以及是如何解决上述Storm存在的问题的。
Reference
-
Esper调优
15/08/2015
Optimization
- Esper跑在JVM上,需要tuning jvm , 使用time window或batch时,占用内存需要考虑一下
- The processing of output events that your listener or subscriber performs temporarily blocks the thread until the processing completes, and may thus reduce throughput. 3.
-
关于分布式程序 java的内存管理浅谈
10/08/2015
关于分布式程序 java的内存管理浅谈
标签(空格分隔): 分布式 内存管理 java
Preface
当前全球正处于互联网时代,是个信息大爆炸时代。对于商家来说,每一天信息都是宝贵的,都可以转换成money的。所以对数据的处理要求也变的越来越严格,从以前的hadoop/MapReduce 的离线处理,到现在的准实时和实时处理,都是由数据需求而引起的技术革命。数据的处理快慢取决于很多因素。现在主流的解决方法,像Spark,Flink,Pular,包括腾讯,阿里,百度的诸多为开源的框架都是基于分布式内存的大数据处理计算框架,所以内存的使用变的很关键。由于各个框架都是基于JVM语言开发的,所以JVM的内存管理问题被提上日程。
Java AutoBoxing and UnBoxing
提到Java大家都会想到AutoBoxing和UnBoxing,这项技术最早是在jdk1.5引入进来的。AutoBoxing是java编译器的自动转换过程,针对primitive和与之对应的object wrapper类之间的自动转换,例如把int转换为Integer,double转换为Double等等。反之呐就是Unboxing。 虽然Boxing技术给开发人员带来了很多方便,但是也带来了问题——内存的问题
所有的primitive 的Object 类型都是继承于Object类的,如果调用new Object(), object没有任何其他可存储的成员,那么它也会占用8个字节的内存。
大家都知道C语言中字节是4字节对齐的,那么java也是字节对齐的,不过是8字节对齐的。类的成员变量声明的先后顺序也能影响该类所占的内存大小。
在现今的大数据处理时代,为了能够提高实时性,很多数据直接在内存中不落地的,所以内存占用很大,也会造成一定效率问题。
这个autoboxing的问题很早之前就已经有人提出来了,有个开源的lib叫fastutil是针对java的autoboxing和unboxing做了优化的。读者可以自行下载源码看看。
Programming based on Memory
上学时图书馆有本书貌似叫《Programming based on Limited Memory》,内容讲的是在移动设备上编程,很老的一本书,那个时候的移动设备内存最大才1M,一般都是512K居多,现在这个Programming based on Limited Memory概念仍然是需要的。
之前Spark提出来基于内存RDD的计算模式,在其documents上对于Memory Tuning有几条建议: > There are three considerations in tuning memory usage: the amount of memory used by your objects (you may want your entire dataset to fit in memory), the cost of accessing those objects, and the overhead of garbage collection (if you have high turnover in terms of objects). >
> By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space than the “raw” data inside their fields. This is due to several reasons: » - Each distinct Java object has an “object header”, which is about 16 bytes and contains information such as a pointer to its class. For an object with very little data in it (say one Int field), this can be bigger than the data. » - Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an array of Chars and keep extra data such as the length), and store each character as two bytes due to String’s internal usage of UTF-16 encoding. Thus a 10-character string can easily consume 60 bytes. » - Common collection classes, such as HashMap and LinkedList, use linked data structures, where there is a “wrapper” object for each entry (e.g. Map.Entry). This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list. » - Collections of primitive types often store them as “boxed” objects such as java.lang.Integer.还有一点关于In-Process Memory 和Distributed Memory的对比:
- Consistency > - In-Process Memory: While using an in-process cache, your cache elements are local to a single instance of your application. Many medium-to-large applications, however, will not have a single application instance as they will most likely be load-balanced. In such a setting, you will end up with as many caches as your application instances, each having a different state resulting in inconsistency. State may however be eventually consistent as cached items time-out or are evicted from all cache instances. > - Distributed caches, although deployed on a cluster of multiple nodes, offer a single logical view (and state) of the cache. In most cases, an object stored in a distributed cache cluster will reside on a single node in a distributed cache cluster. By means of a hashing algorithm, the cache engine can always determine on which node a particular key-value resides. Since there is always a single state of the cache cluster, it is never inconsistent. > - Commnets If you are caching immutable objects, consistency ceases to be an issue. In such a case, an in-process cache is a better choice as many overheads typically associated with external distributed caches are simply not there. If your application is deployed on multiple nodes, you cache mutable objects and you want your reads to always be consistent rather than eventually consistent, a distributed cache is the way to go.
2.Overheads » - In-Process Memory, This dated but very descriptive article describes how an in-process cache can negatively effect performance of an application with an embedded cache primarily due to garbage collection overheads. Your results however are heavily dependent on factors such as the size of the cache and how quickly objects are being evicted and timed-out. » - Distributed caches, A distributed cache will have two major overheads that will make it slower than an in-process cache (but better than not caching at all): network latency and object serialization » - Commnets , As described earlier, if you are looking for an always-consistent global cache state in a multi-node deployment, a distributed cache is what you are looking for (at the cost of performance that you may get from a local in-process cache).
3.Reliability > - In-Process Memory, An in-process cache makes use of the same heap space as your program so one has to be careful when determining the upper limits of memory usage for the cache. If your program runs out of memory there is no easy way to recover from it. > - Distributed caches,A distributed cache runs as an independent processes across multiple nodes and therefore failure of a single node does not result in a complete failure of the cache. As a result of a node failure, items that are no longer cached will make their way into surviving nodes on the next cache miss. Also in the case of distributed caches, the worst consequence of a complete cache failure should be degraded performance of the application as opposed to complete system failure.
> - Commnets, An in-process cache seems like a better option for a small and predictable number of frequently accessed, preferably immutable objects. For large, unpredictable volumes, you are better off with a distributed cache.摘自 In-Process Caching vs. Distributed Caching
Open Project Solutions
-
Apache Flink 利用Java的Unsafe API来自行管理程序内存 详细请看 Juggling with Bits and Bytes,代码可以去github上去clone一下,关于内存管理这块在flink-runtime module里
-
开源的Lib,fastutil: http://fastutil.di.unimi.it/
-
Facebook Presto: Slice(这是facebook利用github上的一个牛人开源项目) github
上述如有不对的地方还请指正,小弟将不胜感激。 转载请注明出处,谢谢
Reference
-
Elasticsearch 相关技术blog
07/08/2015
Elasticsearch 相关技术blog
-
Logstash 学习小记
02/08/2015
# logstash 学习小记
标签(空格分隔): 日志收集
## Introduce Logstash is a tool for managing events and logs. You can use it to collect logs, parse them, and store them for later use (like, for searching). – http://logstash.net
自从2013年logstash被ES公司收购之后,ELK stask正式称为官方用语,很多公司都开始ELK实践,我们也不例外,借用新浪是如何分析处理32亿条实时日志的?的一张图 ![此处输入图片的描述][1] 这是一个再常见不过的架构了: (1)Kafka:接收用户日志的消息队列。 (2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。 (3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。 (4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。
但是众多log 收集framwork,像flume,scribe,fluent,为什么选用logstash呢?
原因很简单:
- 部署启动很容易,只需要有jdk就OK了
- 配置简单,无需编码
- 支持收集log路径的正则表达式,不像flume那样必须写死要收集的文件名,logstash不是,像这样 > path => [“/var/log/.log”]
有个Flume VS Fluentd VS Logstash可以看看 [1]: http://dockerone.com/uploads/article/20150715/26d7dde91a3f96c858a5f4157ade2bf2.png
Logstash Examples
logstash事件处理流程氛围三个stages:input ,filter,output。input支持很多,如file,redis,kafka等等,filter主要是对input的log进行自己想要的处理,output则是输出到你要存储log的第三方framework,如kafka,redis,elasticsearch,db什么的,具体的查看官网。 废话不多说,开始例子: 1. 最最简单的例子 input和output都是标准输入输出 ``` shell [joeywen@192 logstash]$ bin/logstash -e ‘input { stdin { } } output { stdout {}}’
Logstash startup completed >hello world ## 输入的内容 >2015-08-02T05:26:55.564Z joeywens-MacBook-Pro.local hello world ## logstash收集的内容 ```
- 编写config文件 ```config input { file { path => [“/var/log/.log”] type => “syslog” codec => multiline { pattern => “(^\d+\serror)|(^.+Exception:.+)|(^\s+at .+)|(^\s+… \d+ more)|(^\sCausedby:.+)” what => “previous” } } }
output { stdout { codec => rubydebug } # elasticsearch { # host => ‘localhost’ # protocol => ‘transport’ # cluster => ‘elasticsearch’ # index => ‘logstash-joeymac-%{+YYYY.MM.dd}’ # } } ``` 输入是file形式,收集系统日志,如果有异常发生,通常异常会多行,这里用codec => multiline 来对出现异常的多行转换为一行输入
输出就是ES,或者你也可以把stdout作为调试打开看看,输出的是什么内容。运行命令如下以及输出
```shell [joeywen@192 logstash]$ bin/logstash -f sys.conf Logstash startup completed
{ “@timestamp” => “2015-08-02T05:36:08.972Z”, “message” => “Aug 2 13:36:08 joeywens-MacBook-Pro.local GoogleSoftwareUpdateAgent[1976]: 2015-08-02 13:34:51.764 GoogleSoftwareUpdateAgent[1976/0xb029b000] [lvl=2] -[KSUpdateEngine(PrivateMethods) updateFinish] KSUpdateEngine update processing complete.”, “@version” => “1”, “host” => “joeywens-MacBook-Pro.local”, “path” => “/var/log/system.log”, “type” => “syslog” } { “@timestamp” => “2015-08-02T05:36:08.973Z”, “message” => “Aug 2 13:36:08 joeywens-MacBook-Pro.local GoogleSoftwareUpdateAgent[1976]: 2015-08-02 13:36:08.105 GoogleSoftwareUpdateAgent[1976/0xb029b000] [lvl=3] -[KSAgentUploader fetcher:failedWithError:] Failed to upload stats to <NSMutableURLRequest https://tools.google.com/service/update2> with error Error Domain=NSURLErrorDomain Code=-1001 "The request timed out." UserInfo=0x3605f0 {NSErrorFailingURLStringKey=https://tools.google.com/service/update2, _kCFStreamErrorCodeKey=60, NSErrorFailingURLKey=https://tools.google.com/service/update2, NSLocalizedDescription=The request timed out., _kCFStreamErrorDomainKey=1, NSUnderlyingError=0x35fd30 "The request timed out."}”, “@version” => “1”, “host” => “joeywens-MacBook-Pro.local”, “path” => “/var/log/system.log”, “type” => “syslog” } { “@timestamp” => “2015-08-02T05:36:08.973Z”, “message” => “Aug 2 13:36:08 joeywens-MacBook-Pro.local GoogleSoftwareUpdateAgent[1976]: 2015-08-02 13:36:08.272 GoogleSoftwareUpdateAgent[1976/0xb029b000] [lvl=3] -[KSAgentApp uploadStats:] Failed to upload stats <KSStatsCollection:0x4323e0 path="/Users/joeywen/Library/Google/GoogleSoftwareUpdate/Stats/Keystone.stats", count=6, stats={“, “@version” => “1”, “host” => “joeywens-MacBook-Pro.local”, “path” => “/var/log/system.log”, “type” => “syslog” } ``` 如果想添加或删除字段,该怎么办?filter就该登场了
- filter filter的功能十分强大,可以对input的内容做任何更改,input的内容会转换为一个叫event的map,里面存放着key/value对,正如你所看到的输出一样,@timestamp,type,@version, host,message等等,都是event里面的key,在filter里面你可以启动ruby 编程plugin对其做任何更改 如: ``` input { file { path => [“/var/log/.log”] type => “syslog” codec => multiline { pattern => “(^\d+\serror)|(^.+Exception:.+)|(^\s+at .+)|(^\s+… \d+ more)|(^\sCausedby:.+)” what => “previous” } } }
filter {
if [type] =~ /^syslog/ { ruby { code => "file_name = event['path'].split('/')[-1] event['file_name'] = file_name" } } }
output { stdout { codec => rubydebug } }
如上我对type已syslog开头的event做更改,调用ruby编程 看看输出
[joeywen@192 logstash]$ bin/logstash -f sys.conf Logstash startup completed { “@timestamp” => “2015-08-02T05:46:52.771Z”, “message” => “Aug 2 13:46:40 joeywens-MacBook-Pro.local Dock[234]: CGSConnectionByID: 0 is not a valid connection ID.”, “@version” => “1”, “host” => “joeywens-MacBook-Pro.local”, “path” => “/var/log/system.log”, “type” => “syslog”, “file_name” => “system.log” }可以看到多了个file_name的字段, 如果相对message做解析的话,需要调用grok plugin来做,grok是很强大插件,例如
input { file { path => “/var/log/http.log” } } filter { grok { patterns_dir => [“/opt/logstash/patterns”, “/opt/logstash/extra_patterns”] match => { “message” => “%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}” } } } ``` 对于message字段调用正则匹配,语法是%{SYNTAX:SEMANTIC} 第一个SYNTAX是正则表达式名称,第二个是对于匹配成功的字段取名字,这些SYNTAX存在指定的pattern_dir目录下的文件,格式是: >NAME PATTERN >如 NUMBER \d+也可以使用mutate来最event的key和value做更改,包括remove,add,update,rename 等等,具体的都可以看看(logstash文档)[https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html]
这里给个具体的例子吧 配置: ``` input { file { path => [“/var/log/.log”] type => “syslog” codec => multiline { pattern => “(^\d+\serror)|(^.+Exception:.+)|(^\s+at .+)|(^\s+… \d+ more)|(^\sCausedby:.+)” what => “previous” } } }
filter {
if [type] =~ /^syslog/ { ruby { code => "file_name = event['path'].split('/')[-1] event['file_name'] = file_name" } grok { patterns_dir => ["./patterns/*"] match => {"message" => "%{MAC_BOOK:joeymac}"} } mutate { rename => {"file_name" => "fileName"} add_field => {"foo_%{joeymac}" => "Hello world, from %{host}"} } } }
output { stdout { codec => rubydebug } }
输出
[joeywen@192 logstash]$ bin/logstash -f sys.conf Logstash startup completed { “@timestamp” => “2015-08-02T06:10:13.161Z”, “message” => “Aug 2 14:10:12 joeywens-MacBook-Pro com.apple.xpc.launchd[1] (com.apple.quicklook[2206]): Endpoint has been activated through legacy launch(3) APIs. Please switch to XPC or bootstrap_check_in(): com.apple.quicklook”, “@version” => “1”, “host” => “joeywens-MacBook-Pro.local”, “path” => “/var/log/system.log”, “type” => “syslog”, “joeymac” => “joeywens-MacBook-Pro”, “fileName” => “system.log”, “foo_joeywens-MacBook-Pro” => “Hello world, from joeywens-MacBook-Pro.local” } ```