-
Why is apache spark implemented in scala
08/10/2015
# Why is Apache Spark implemented in Scala?
标签(空格分隔): 未分类
By:Matei Zaharia, CTO @ Databricks When we started Spark, we wanted it to have a concise API for users, which Scala did well. At the same time, we wanted it to be fast (to work on large datasets), so many scripting languages didn’t fit the bill. Scala can be quite fast because it’s statically typed and it compiles in a known way to the JVM. Finally, running on the JVM also let us call into other Java-based big data systems, such as Cassandra, HDFS and HBase.
Since we started, we’ve also added APIs in Java (which became much nicer with Java 8) and Python.
-
分享一个python版的sql select语句解析代码
05/09/2015
分享一个python版的SQL Select语句解析代码
标签(空格分隔): python sqlparser
```python # select_parser.py # Copyright 2010, Paul McGuire # # a simple SELECT statement parser, taken from SQLite’s SELECT statement # definition at http://www.sqlite.org/lang_select.html # from pyparsing import *
LPAR,RPAR,COMMA = map(Suppress,”(),”) select_stmt = Forward().setName(“select statement”)
keywords
(UNION, ALL, AND, INTERSECT, EXCEPT, COLLATE, ASC, DESC, ON, USING, NATURAL, INNER, CROSS, LEFT, OUTER, JOIN, AS, INDEXED, NOT, SELECT, DISTINCT, FROM, WHERE, GROUP, BY, HAVING, ORDER, BY, LIMIT, OFFSET) = map(CaselessKeyword, “"”UNION, ALL, AND, INTERSECT, EXCEPT, COLLATE, ASC, DESC, ON, USING, NATURAL, INNER, CROSS, LEFT, OUTER, JOIN, AS, INDEXED, NOT, SELECT, DISTINCT, FROM, WHERE, GROUP, BY, HAVING, ORDER, BY, LIMIT, OFFSET””“.replace(“,”,””).split()) (CAST, ISNULL, NOTNULL, NULL, IS, BETWEEN, ELSE, END, CASE, WHEN, THEN, EXISTS, COLLATE, IN, LIKE, GLOB, REGEXP, MATCH, ESCAPE, CURRENT_TIME, CURRENT_DATE, CURRENT_TIMESTAMP) = map(CaselessKeyword, “"”CAST, ISNULL, NOTNULL, NULL, IS, BETWEEN, ELSE, END, CASE, WHEN, THEN, EXISTS, COLLATE, IN, LIKE, GLOB, REGEXP, MATCH, ESCAPE, CURRENT_TIME, CURRENT_DATE, CURRENT_TIMESTAMP””“.replace(“,”,””).split()) keyword = MatchFirst((UNION, ALL, INTERSECT, EXCEPT, COLLATE, ASC, DESC, ON, USING, NATURAL, INNER, CROSS, LEFT, OUTER, JOIN, AS, INDEXED, NOT, SELECT, DISTINCT, FROM, WHERE, GROUP, BY, HAVING, ORDER, BY, LIMIT, OFFSET, CAST, ISNULL, NOTNULL, NULL, IS, BETWEEN, ELSE, END, CASE, WHEN, THEN, EXISTS, COLLATE, IN, LIKE, GLOB, REGEXP, MATCH, ESCAPE, CURRENT_TIME, CURRENT_DATE, CURRENT_TIMESTAMP))
identifier = ~keyword + Word(alphas, alphanums+”_”) collation_name = identifier.copy() column_name = identifier.copy() column_alias = identifier.copy() table_name = identifier.copy() table_alias = identifier.copy() index_name = identifier.copy() function_name = identifier.copy() parameter_name = identifier.copy() database_name = identifier.copy()
expression
expr = Forward().setName(“expression”)
integer = Regex(r”[+-]?\d+”) numeric_literal = Regex(r”\d+(.\d*)?([eE][+-]?\d+)?”) string_literal = QuotedString(“’”) blob_literal = Combine(oneOf(“x X”) + “’” + Word(hexnums) + “’”) literal_value = ( numeric_literal | string_literal | blob_literal | NULL | CURRENT_TIME | CURRENT_DATE | CURRENT_TIMESTAMP ) bind_parameter = ( Word(“?”,nums) | Combine(oneOf(“: @ $”) + parameter_name) ) type_name = oneOf(“TEXT REAL INTEGER BLOB NULL”)
expr_term = ( CAST + LPAR + expr + AS + type_name + RPAR | EXISTS + LPAR + select_stmt + RPAR | function_name + LPAR + Optional(delimitedList(expr)) + RPAR | literal_value | bind_parameter | identifier )
UNARY,BINARY,TERNARY=1,2,3 expr « operatorPrecedence(expr_term, [ (oneOf(‘- + ~’) | NOT, UNARY, opAssoc.LEFT), (‘||’, BINARY, opAssoc.LEFT), (oneOf(‘* / %’), BINARY, opAssoc.LEFT), (oneOf(‘+ -‘), BINARY, opAssoc.LEFT), (oneOf(‘« » & |’), BINARY, opAssoc.LEFT), (oneOf(‘< <= > >=’), BINARY, opAssoc.LEFT), (oneOf(‘= == != <>’) | IS | IN | LIKE | GLOB | MATCH | REGEXP, BINARY, opAssoc.LEFT), (‘||’, BINARY, opAssoc.LEFT), ((BETWEEN,AND), TERNARY, opAssoc.LEFT), ])
compound_operator = (UNION + Optional(ALL) INTERSECT EXCEPT) ordering_term = expr + Optional(COLLATE + collation_name) + Optional(ASC DESC) join_constraint = Optional(ON + expr USING + LPAR + Group(delimitedList(column_name)) + RPAR) join_op = COMMA (Optional(NATURAL) + Optional(INNER CROSS LEFT + OUTER LEFT OUTER) + JOIN) join_source = Forward() single_source = ( (Group(database_name(“database”) + “.” + table_name(“table”)) | table_name(“table”)) + Optional(Optional(AS) + table_alias(“table_alias”)) + Optional(INDEXED + BY + index_name(“name”) | NOT + INDEXED)(“index”) | (LPAR + select_stmt + RPAR + Optional(Optional(AS) + table_alias)) | (LPAR + join_source + RPAR) )
join_source « single_source + ZeroOrMore(join_op + single_source + join_constraint)
result_column = “” | table_name + “.” + “” | (expr + Optional(Optional(AS) + column_alias)) select_core = (SELECT + Optional(DISTINCT | ALL) + Group(delimitedList(result_column))(“columns”) + Optional(FROM + join_source) + Optional(WHERE + expr(“where_expr”)) + Optional(GROUP + BY + Group(delimitedList(ordering_term)(“group_by_terms”)) + Optional(HAVING + expr(“having_expr”))))
select_stmt « (select_core + ZeroOrMore(compound_operator + select_core) + Optional(ORDER + BY + Group(delimitedList(ordering_term))(“order_by_terms”)) + Optional(LIMIT + (integer + OFFSET + integer | integer + COMMA + integer)))
tests = “””\ select * from xyzzy where z > 100 select * from xyzzy where z > 100 order by zz select * from xyzzy””“.splitlines() for t in tests: print t try: print select_stmt.parseString(t).dump() except ParseException, pe: print pe.msg print
```
测试环境: >python 2.7.5 >pyparsing 2.03
运行结果:
select * from xyzzy where z > 100 ['SELECT', ['*'], 'FROM', 'xyzzy', 'WHERE', ['z', '>', '100']] - columns: ['*'] - table: ['xyzzy'] - where_expr: ['z', '>', '100'] select * from xyzzy where z > 100 order by zz ['SELECT', ['*'], 'FROM', 'xyzzy', 'WHERE', ['z', '>', '100'], 'ORDER', 'BY', ['zz']] - columns: ['*'] - order_by_terms: ['zz'] - table: ['xyzzy'] - where_expr: ['z', '>', '100'] select * from xyzzy ['SELECT', ['*'], 'FROM', 'xyzzy'] - columns: ['*'] - table: ['xyzzy']
-
Win git error init_cheap Virtualalloc pointer is null, win32 error 487
01/09/2015
win git error init_cheap:VirtualAlloc pointer is null, Win32 error 487
标签(空格分隔): git
在idea利用git进行代码更新时遇到的问题,google了一下,早StackOverflow找到解决办法,在此share一下
Error message
E:\vipshop\storm-sql>git pull origin joeywen 0 [main] us 0 init_cheap: VirtualAlloc pointer is null, Win32 error 487 AllocationBase 0x0, BaseAddress 0x68570000, RegionSize 0x2F0000, State 0x10000 C:\Program Files (x86)\Git\bin\sh.exe: *** Couldn't reserve space for cygwin's heap, Win32 error 0
原因分析:
Cygwin uses persistent shared memory sections, which can on occasion become corrupted. The symptom of this is that some Cygwin programs begin to fail, but other applications are unaffected. Since these shared memory sections are persistent, often a reboot is needed to clear them out before the problem can be resolved.
解决办法:
I had the same problem. I found solution here http://jakob.engbloms.se/archives/1403
c:\msysgit\bin>rebase.exe -b 0x50000000 msys-1.0.dll
For me solution was slightly different. It wasC:\Program Files (x86)\Git\bin>rebase.exe -b 0x50000000 msys-1.0.dll
Before you rebase dlls, you should make sure it is not in use:tasklist /m msys-1.0.dll
If the rebase command fails with something like: >ReBaseImage (msys-1.0.dll) failed with last error = 6You will need to perform the following steps in order: >-1. Copy the dll to another directory >-2.Rebase the copy using the commands above >-3.Replace the original dll with the copy.
参考:Git Extensions: Win32 error 487: Couldn’t reserve space for cygwin’s heap, Win32 error 0
-
Storm杂谈之acker拾趣
27/08/2015
Storm杂谈之Acker拾趣
本文所讲内容并非storm的acker机制,如果想看acker机制的让您失望了,不过在此奉上徐明明大牛的blog: Twitter Storm源代码分析之acker工作流程 Twitter Storm如何保证消息不丢失
或者查看《storm源码分析》(又给京狗打链接)第12章-storm的acker系统,里面会详细说明storm的acker机制,笔者在此就不多述(多述都是废话,还不一定有人家讲的好)了。
这篇主要讲一下,关于开acker和不开acker的区别。 首先说一下,BasicBolt和RichBolt的区别,RichBolt会帮我们自动ack tuple的,basicbolt不会,所以如果继承的是basicBolt的话,就需要自己outputcollecter调ack方法了。
一般Storm有个配置项
/** * How many executors to spawn for ackers. * * <p>If this is set to 0, then Storm will immediately ack tuples as soon * as they come off the spout, effectively disabling reliability.</p> */ public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
该配置项是配置Acker Bolt数目的,大于0则spout每发一条msg,都会把相应的信息<rootid, msgid>发送给Acker Bolt进行跟踪。AckerBolt跟踪这个消息进行可靠性处理。但是如果TOPOLOGY_ACKER_EXECUTORS配置为0的话,bolt和spout之间的ack又会是怎样的呐??
先看下面的executor.clj中(defmethod mk-threads :spout [executor-data task-datas]方法的部分代码
pending (RotatingMap. 2 ;; microoptimize for performance of .size method (reify RotatingMap$ExpiredCallback (expire [this msg-id [task-id spout-id tuple-info start-time-ms]] (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta) )))) tuple-action-fn (fn [task-id ^TupleImpl tuple] (let [stream-id (.getSourceStreamId tuple)] (condp = stream-id Constants/SYSTEM_TICK_STREAM_ID (.rotate pending) Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple) (let [id (.getValue tuple 0) [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)] (when spout-id (when-not (= stored-task-id task-id) (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id)) (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))] (condp = stream-id //这里,根据stream id 来对msg进行ack或fail ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id) spout-id tuple-finished-info time-delta) ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-finished-info time-delta) ))) ;; TODO: on failure, emit tuple to failure stream )))) receive-queue (:receive-queue executor-data) event-handler (mk-task-receiver executor-data tuple-action-fn) has-ackers? (has-ackers? storm-conf) emitted-count (MutableLong. 0) empty-emit-streak (MutableLong. 0)
再看acker.clj的execute代码就清晰多了,acker也是根据stream-id来对tuple的msgid进行异或处理的,(^void execute [this ^Tuple tuple] (let [^RotatingMap pending (.getObject pending) stream-id (.getSourceStreamId tuple)] (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) (.rotate pending) (let [id (.getValue tuple 0) ^OutputCollector output-collector (.getObject output-collector) curr (.get pending id) curr (condp = stream-id ACKER-INIT-STREAM-ID (-> curr (update-ack (.getValue tuple 1)) (assoc :spout-task (.getValue tuple 2))) ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1)) ACKER-FAIL-STREAM-ID (assoc curr :failed true))] (.put pending id curr) (when (and curr (:spout-task curr)) (cond (= 0 (:val curr)) (do (.remove pending id) (acker-emit-direct output-collector (:spout-task curr) ACKER-ACK-STREAM-ID [id] )) (:failed curr) (do (.remove pending id) (acker-emit-direct output-collector (:spout-task curr) ACKER-FAIL-STREAM-ID [id] )) )) (.ack output-collector tuple) )))) (^void cleanup [this] )
那么bolt是怎样调用acker的呐,上面说了,继承richbolt时会自动调用ack方法,那么ack方法到底做了哪些事情呐,按executor.clj中的mk-threads :bolt部分代码就清晰多了(.prepare bolt-obj storm-conf user-context (OutputCollector. //实现了匿名内部类,实现IOutputCollector的emit,emitDirect,ack和fail方法 (reify IOutputCollector (emit [this stream anchors values] (bolt-emit stream anchors values nil)) (emitDirect [this task stream anchors values] (bolt-emit stream anchors values task)) (^void ack [this ^Tuple tuple] (let [^TupleImpl tuple tuple ack-val (.getAckVal tuple)] (fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)] (task/send-unanchored task-data ACKER-ACK-STREAM-ID [root (bit-xor id ack-val)]) )) (let [delta (tuple-time-delta! tuple)] (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when delta (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data) executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta) (stats/bolt-acked-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) delta)))) (^void fail [this ^Tuple tuple] (fast-list-iter [root (.. tuple getMessageId getAnchors)] (task/send-unanchored task-data ACKER-FAIL-STREAM-ID [root])) (let [delta (tuple-time-delta! tuple)] (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when delta (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data) executor-stats (.getSourceComponent tuple)
从上面可以看出,prepare方法中实现了匿名内部类,实现IOutputCollector的emit,emitDirect,ack和fail方法,然后把OutputCollector传给用户所实现的bolt,即bolt重载prepare方法中的OutputCollector,当用利用OutputCollector调用ack方法时就是调用上面clojure代码中匿名实现的ack方法,该方法会发送一个ACKER-ACK-STREAM-ID 的streamid,然后Spout就可以接收到该stream进行ack操作,这也是你在WebUI上看到spout后面的ack count数目了。总结一下
- 开了Acker,只是对发送的消息进行跟踪处理,处理成功或失败会发送stream给spout,spout回调ack或fail方法同时进行metric统计,然后具体的重发逻辑用户可以自己定义。
- 不开Acker, Bolt调用ack方法也会发送stream给spout,spout接收stream做metric统计。
Spout只是根据Ack stream id来判断是否进行ack或fail,跟具体哪个bolt没有任何关系。所以大家不要认为storm的消息可靠性是来源spout和acker bolt之间的通信,其实不然,Bolt也是会发消息给spout的。
OK,到此结束。 PS:storm版本是0.9.4
-
Twitter 新一代流处理利器——heron 论文笔记之heron架构
20/08/2015
Twitter 新一代流处理利器——Heron 论文笔记之Heron架构
标签(空格分隔): Streaming-process realtime-process
Heron Architecture
Heron 架构如下图:
用户编写发布topoloy到Aurora调度器。每一个topology都作为一个Aurora的job在运行。每一个job包括几个container,这些container由Aurora来分配和调度。第一个container作为Topology Master,其他的Container作为Stream Manager。所有的元数据信息包括谁提交的job,job的运行信息,启动时间等等都会保存在Zookeeper中。 每个Heron Instance都是用java写的,且都是JVM进程。Heron进程之间用protocol buffers进行通信。
Heron Instance
值得一提的是每个HI都是只跑一个task,即要么是spout要么是bolt。这样有利于debug。 这种设计也为以后数据的复杂性考虑,当以后数据复杂性变高的时候,我们还可以考虑用其他语言来实现HI。
HI的设计有以下两种:
- 单线程
- 双线程 ### Single-threaded approach 主线程有一个TCP channel与本地的SM通信,等待tuple的到来,一旦tuple来了,就会调用用户的逻辑代码来执行,如果需要输出,该线程就会缓存数据,直到达到阈值,然后输出到downstream的SM。
这种设计简单,但是也有一些缺点,由于某些原因,用户的逻辑可能被block:
- Invoking the sleep system call for a finite duration of time
- Using read/write system calls for file or socket I/O
- Calling thread synchronization primitives
Two-threaded approach
顾名思义,两个thread:Gateway thread 和Task Execution thread,如下图:
Gateway thread负责数据的输入输出和通信 Task Execution thread则负责运行用户逻辑代码
Gateway thread要和Task Execution thread要进行数据通信,他们之间通过如上图的三种queue来通信。Gateway thread用data-in往Task Execution thread输入数据,Task Execution thread用data-out往Gateway thread,metrics-out是用Task Execution thread用来收集metric然后往Gateway thread发送。
Toplogy Master
TM(Topology Master)主要负责topology的throughout,在startup的时候,TM把信息存放在Zookeeper上,以便其他进程能够发现TM。所以TM有如下两个目的:
- 阻止多个TM的产生
- 允许其他属于该topology的进程发现该TM
Topology Backpressure
Heron提供一种背压机制来动态调整数据流动的速率。这种机制可以让topology中的各个components以不同speed来跑。也可以动态更改它的speed。
TCP Backpressure
这个策略利用TCP窗口的机制来梳理HI(Heron Instance)和其他Componet的背压。所有的消息都是通过TCP sockets来做通信,如果某个HI处理缓慢,那么它的本地接收buffer就会被装满,在这个HI上游和数据通信的SM也会发现这个然后填满发送的buffer,这样该HI的处理速度就加快了。
Spout backpressure
这个背压策略是和TCP背压策略协同使用的,当SM发现它本地的HI运行慢时,SM就会通知本地的SPout停止读取数据,那么往该Spout发送数据的SM的buffer就会阻塞以致fill up,这是受影响的SM就会发送一条start backpressure的msg到其他与之相连的SM,当其他SM收到该msg时就会告诉他们本地的Spout不再读取数据,当上游缓慢的HI速度赶上来之后,SM再发一个stop backpressure的msg到下游,然后停止backpressure。
当topoloy处于backpressure模式时,它的运行速度取决于最慢的那个HI。
Architecture Features: Summary
- First, the provisioning of resources (e.g. for containers and even the Topology Master) is cleanly abstracted from the duties of the cluster manager, thereby allowing Heron to “play nice” with the rest of the (shared) infrastructure.
- Second, since each Heron Instance is executing only a single task (e.g. running a spout or bolt), it is easy to debug that instance by simply using tools like jstack and heap dump with that process.
- Third, the design makes it transparent as to which component of the topology is failing or slowing down, as the metrics collection is granular, and lets us easily map an issue unambiguously to a specific process in the system.
- Fourth, by allowing component-level resource allocation, Heron allows a topology writer to specify exactly the resources for each component, thereby avoiding unnecessary over-provisioning.
- Fifth, having a Topology Master per topology allows each topology to be managed independently of each other (and other systems in the underlying cluster). In additional, failure of one topology (which can happen as user-defined code often gets run in the bolts) does not impact the other topologies.
- Sixth, the backpressure mechanism allows us to achieve a consistent rate of delivering results, and a precise way to reason about the system. It is also a key mechanism that allows migrating topologies from one set of containers to another (e.g. to an upgraded set of machines).
- Finally, we now do not have any single point of failure.
Performance
直接看图吧
Previous Página 1 de 3 Next