1、How to Build a Stream Database,Theodore Johnson AT&T Labs - Research,What is a stream database?,Query data from a stream A data feed with a schema You can also query conventional relations Examples Sensor data Stock market quotes Network monitoring data Querying a stream forces some changes to the D
2、BMS: Must use push-based rather than pull-based operators Must be able to provide partial answers E.g., you never finish the query One-pass E.g., you cannot (in general) rewind the stream.,Stream Databases for Network Measurements,Continuing need to measure and monitor networks Router configuration,
3、 debugging, detect network attacks, verify service agreements, . Very large amounts of data In principle, wed like to query every packet flowing in the network And in real time Data arrives in streams IP streams, NetFlow streams, SNMP streams, Special queries : grouping by subsequences IP packets fo
4、rming a flow, forming a TCP/IP session, forming a users interactions, ,Query Language,Typical queries: For each source IP address and each 5 minute interval, count the number of bytes and number of packets related to HTTP transfers Find the TCP/IP SYN packets with and without matching FIN packets Co
5、mpute the NetFlows in the packet stream, using a 30-second timeout between packets Pervasive use of time and sequence. We would like to express these queries using a minimal change to SQL. We will rely on the query optimizer making use of ordering properties of the data streams.,Basics,Selection, pr
6、ojection, join, group-by, aggregation, etc. Mix stream with tables Some restrictions to ensure that we can answer the query in limited space Join : When joining streams, the join predicate must define a window in which the join must occur E.g. match SYN packets on an inbound link with SYNACK on an o
7、utbound link. Group-by and Aggregation : We must be able to determine when all tuples for a group have been processed E.g., number of packets during each 30 second interval More on this later.,Complex Aggregation,Grouping Variables Analogous to table variables Represents the value of a correlated su
8、bquery Only aggregate values can be referenced Example:,Select SourceIP, tb, (count(*)+count(X)/2+count(Y)/4)/1.75 From Packets Group By SourceIP, ts/60, ts/60+1,ts/60+2 as tb, X, Y Such thatX.SourceIP=SourceIP and X.ts/60+1=tbY.SourceIP=SourceIP and Y.ts/60+2=tb,X represents the querySelect * from
9、Packetswhere SourceIP=$SourceIP and ts/60+1 = $tb,Defining Sequences,Count the packets in connection K between the SYN packet and the FIN packet,Select K, ts, count(Y) from TCPIP Where SYN=1 Group by K, ts : X, Y Such ThatX.K = K and X.ts ts and X.FIN = 1Y.K = K and Y.ts = ts and Y.ts = MIN(X.ts),Or
10、dering Properties,The query language lets us express queries that seem to require self-joins, etc. But the queries frequently have a temporal component: timestamps as group-by variables, timestamps in the join predicates, etc. If we can reason about timestamps, we can find a stream evaluation plan f
11、or these queries But not all We want to avoid cumbersome model restrictions, e.g. sequence databases We want precise semantics, e.g. avoid “continuous query” models.,Temporal Properties,Define ordering properties on attributes of a stream. Allow for multiple ordering properties, e.g. multiple timest
12、amps, start time vs. end time, timestamp vs. sequence number, etc. Many types of ordering properties Increasing, nondecreasing, Increasing within delta, banded-increasing(epsilon) Increasing in group G Ordering properties are part of the data type.,Stream TCPIPUllong timestamp increasing;Uint Source
13、IP;Uint SequenceNbr increasing_in_group(SourceIP, ) ; ,Stream Operators,Power of relational algebra : closed algebra. Enable the composition of complex queries E.g., COUNT DISTINCT is a COUNT(*) over a GROUP BY Need stream operators which produce streams That is, we can deduce ordering properties of
14、 the output We have defined ordering properties to capture semantics of the output of operators Increasing in group G : group-by and aggregation Banded-increasing : window join. Implementation detail : special operators Emulate complex network protocols, e.g. IP defragmentation,Basic Operators,Selec
15、tion, projection, non-stream join, etc. Scalar expressions : perform type imputation on temporal properties, e.g. timestamp/5000 is non-decreasing Join between two streams: The join predicate must define a window between ordered attributes E.g. R.ts BETWEEN(S.ts, S.ts+epsilon) Join algorithm can tra
16、de off buffer space for improved ordering properties. R.ts and S.ts banded-increasing, vs. R.ts (S.ts) increasing and S.ts (R.ts) banded-increasing.,Additional Operators,Stream Union : Merge two streams Preserve an ordering property Stream sort Improve an ordering property User-defined operators,Gro
17、up-by and Aggregation,We need to determine when to open and when to flush groups based on the tuple stream GOPEN(t,G) : set of groups to create when tuple t arrives, and the set of groups is G. GCLOSE(t,g): returns TRUE when if group g will not receive any further tuples, based on attributes of t. C
18、omplex aggregation : Each aggregate has an associated predicate. A tuple contributes to the aggregate only if it satisfies the predicate. Note: In this general this predicate defines a join condition between G and the tuple stream. Correlated aggregates : In some cases (especially when defining sequ
19、ences) we can even compute correlated aggregates. Recall the example on slide 7.,Optimization,Conventional optimization Push selection, projection as low as possible Join order optimization Operator-specific optimization Better implementations Search for predicates which allow operator-specific opti
20、mizations Temporal property optimization Ordering properties of input vs. operator speed vs. ordering properties of the output.,Gigascope,Fast and flexible network monitor Submit SQL-like queries to obtain a monitoring stream Monitor Gigabit Ethernet (1Gbps X 2 directions) Aggressive optimizations E
21、xecute some or all of the queries in the Network Interface Card (NIC) Goals Execute queries over every byte of every packet in the link. Layer-7 queries Reconstruct TCP sessions, interpret streaming media control traffic,. Etc. Gigascope is the motivation for the stream database research. Demo in SI
22、GMOD 2002,Gigascope Architecture,Stream database Registry : record semantics of the executing query nodes. Stream manager : route tuples between query nodes, application Two layer architecture Low-level queries : input is a sniffed packet stream. High-level queries : input is a tuple stream.,Stream
23、Manager,NIC1,lq1,lqn,lq1,lqn,Registry,HQn,Appm,NIC2,Query Processing Architecture,Query nodes represent a single-block query, and are generated code. All query nodes live in a run-time system, and follow an API Callbacks : initialize, accept_tuple, accept_command, free Functions : post_tuple, standa
24、rd and user-defined functions Low-level queries Limited set of query nodes (selection/projection, aggregation) Tight constraints on resource usage High-level queries Much wider variety of operators Use operator templates, specialize with generated functors. Accept_tuple callback routes tuples throug
25、h operators in the query node.,Splitting a query,Network packets are presented only to low-level queries The NIC has two 88Mhz processors, but only 1Mbyte of memory. Limited set of operators, available functions, etc. If a query cannot be executed entirely in the NIC, it is split into low-level quer
26、ies and high-level queries That is, perform as much selection as possible in the NIC Also perform partial aggregation. Complete the aggregation in a high-level query.,Generating Code,Parse the query Flex, Bison. Build the parse tree. Analyze the parse tree Build symbol tables Table references, colum
27、n references, group-by variables, aggregate references, etc. Determine type of query Selection, join, aggregation, etc. Analyze the predicates Convert to CNF Build query nodes (and query plan) Fill in placeholders (the selection predicate, etc.) Split the query Result is one or more queries Optimize
28、 the query plan Perform further code-generation time analysis Generate the code,Other nice features,Every query can accept parameters Necessary flexibility, because changing low-level queries requires rebuilding the RTS. More generally, each query accepts commands Load new parameters, report statist
29、ics (and errors), etc. High-level queries relay the command to the low-level queries. Stream-based architecture Easy to add nested queries on-the-fly Easy extension to distributed queries (we think) Executables are self-documenting The source code contains the schema and the query Library for parsing and interpreting the query.,Any Questions?,