Lecture 7: Distributed Programming: MapReduce and Pig Outline: Why, designs, challenges MapReduce (Google) Pig (Yahoo) Distributed systems are useful: exploit parallelism, respect resource locality, provide security. However, they are difficult to write. Must consider many things: Communication, failures and fault tolerance, consistency, synchronization, etc. These aspects are well-studied now. Why should a programmer have to worry about these "standard" problems every time they write a distributed application? Nice to have "frameworks" for writing distributed applications, which handle these standard problems automatically, leaving the programmer to only worry about only the specific task they want to perform (as they would in a sequential, local program). ------------- Broad considerations in designing distributed applications. Designs: - Grid computing (general term): Full computers connected together by a network. E.g., individually owned PCs connected on the Internet. Usually, not designed to be used together, but brought together as an afterthought. - Cluster computing: Full computers connected by high-speed links, as in a compute center. The network is designed specifially for large-scale compute tasks. - Disaggregated computing (new): Standard resources like CPUs, memory and storage may be split across machines. Better use of resources. MapReduce was designed for clusters. Challenges: Parallelize application - Application must have parallelizability. It must not be inherently sequential (hopeless case). - If application is parallelizable, how to communicate state from one stage to next. Network is usually a bottleneck. Balance computations of an application across computers (spatial scheduling) - Statically, when designer knows how much work there is. - Dynamically. - Must account for heterogeneity in machines and computations. Handle failures - With a 1,000 machines, how likely is a failure in a 1 hour period? - Often easier than with banking applications (or YFS lock server) - Many computations have no harmful side-effects and have clear commit points Scheduling machines (temporal scheduling) - Time-sharing - Strict partitioning Brief examples of distributed applications: Grep Data conversion ----------------- MapReduce Framework for distributing computations that have a specific /structure/ as follows: - The end-goal is to compute a final value (e.g., an aggregate) for each of a set of keys. (Also called tabulation.) - The value for a key is computed by aggregating *intermediate values* for the key. - The intermediate values for each key are computed from input data items, processing them one at a time. Hence, we have a two stage pipeline: 1. Input data sources => For each key, a list of intermediate values 2. For each key, list of intermediate values => output value The first stage can be parallelized with one task for a subset of input data items. Called a map task or mapper. For a mapper, Input = one shard on input data. Output = list of pairs in that shard. The second stage can also be parallelized with one task a subset of keys. Called a reduce task or reducer. For a reducer, Input = . Output = final value for the key. Example (Word count): In a large corpus of documents, count the occurrences of each word. Input data = the corpus of documents, key = specific word, final value of key (word) = count of that word, intermediate values of key = partial counts of that word. Mapper: Take a set of documents, count occurrences of every word in that set, output pairs. Reducer: Take a key (word), list of all partial counts of that key, add the partial counts. Example (Reverse index): In a large corpus of documents, build a map from words to the lists of documents that contain those words. Input data = the corpus of documents, key = specific word, final value of key (word) = list of all document ids containing the word, intermediate values of key = partial lists of document ids containing the word. Mapper: Take a set of documents, lex them to separate words, output a list of . Reducer: Take a key (word), list of all partial lists of documents containing word, union the inner lists to produce the output list. Implementation: (diagram; Fig. 1 in paper) Assume cluster of machines. Programmer defines map and reduce functions. map: data shard -> pairs reduce: -> final value for the key To use, program invokes MapReduce library Provides map and reduce functions Provides list of input data sources Library creates master process master creates mapper and reducer tasks on available machines splits input data into M shards, assigns each shard to a mapper *partitions keys into R sets*, assigns each set to a reducer master assigns tasks to machines (scheduling), monitors them for liveness, handles faults (explained later) A mapper: reads its input data shard produces list of pairs using the map function commits to local disk, grouped into the R partitions tells master where intermediate values of each of the R partitions are A reducer: Obtains from master its partition of keys and where intermediate values in the partition are at each mapper Contacts each mapper to get intermediate values for its partition (via RPCs) Sorts the keys in its partition For each key in sorted order, produces output value by aggregating intermediate values of the key using the reduce function Writes output values to a *shared file system* When everything is done, master returns to caller Questions: Why do mappers store intermediate values on local storage, not a shared FS? Why are keys sorted in each partition? How are M and R chosen? How does mapper decide where to execute each task? Fault tolerance: Master periodically pings each mapper and reducer task. If any doesn't respond, assumes failure. A failed task is moved to idle state and re-executed on another node. If mapper fails, reducers are told where it has been re-mapped (so they get the data from there). If reducer fails, it is simply re-executed. Questions: Can we use intermediate values that a failed mapper has already committed? What about a reducer? What happens if a mapper designated failed reports results back to the master later? Master ignores the message, since the work has been re-assigned. (At most once semantics.) Why are at most once semantics for mappers important? What happens if a reducer designated failed reports results back to the master later? Master can ignore (at most once semantics) or do what it would had the failure not happened (at least once semantics). The reducer's temporary file is *atomically* renamed by the master to have a global name. Doing this multiple times simply overwrites the older copies. (Assuming atomic rename semantics in the global file system.) Hence, the reducer's output is idempotent. Semantics: If map and reduce functions are deterministic, then output is same as non-faulty sequential run. Questions: An additional assumption is needed for the previous statement. What is it? That the input given to an instance of reduce is obtained by combining tuples from different mappers deterministically. Else, there's a race between mappers, but the sequential run can only produce one output (since it must resolve this race exactly one way). Alternatively, if reduce is commutative, then this issue doesn't arise. ----------------- Pig Pig Latin: A Not-So-Foreign Language for Data Processing. Olston, Reed, Srivastava, Kumar, Tomkins https://pig.apache.org/ Motivation Applications for data processing and analysis often use a common set of basic operations, such as sorting, filtering, grouping and joining. Conceptually, MapReduce allows for data filtering/transformation, grouping (map), and aggregation (reduce). MapReduce of itself does not support data joins. Even for selecting and aggregating, the code must be written by hand (or re-used). So, programmers cannot focus on just high-level functionality at the level of data flows. MapReduce also does not support multi-stage processing primitively. Programmers must do that manually, alternating MapReduce calls and stitching code. The Pig programming framework is tailored for data processing and analysis tasks and provides a high-level language for these operations. Design: Pig latin: high-level language featuring common operations for data processing and analysis inspired by SQL --> (FILTER = SQL SELECT, COGROUP = SQL JOIN, GROUP = SQL 'GROUP BY') allows embedding user-specified code (UDFs) Pig programs are compiled to multi-staged MapReduce jobs --> can be executed on existing MapReduce cluster --> faults handled by MapReduce framework Example: Input table T(name, age). Compute number of records of each age, if f(age) = true, where f is a boolean-valued UDF. A = GROUP T by age B = FOREACH A GENERATE group, COUNT($0) C = FILTER B by f(age) = true How does this run on MapReduce? A = GROUP T by age --> Map: identity function, output key = age; Reduce: concatenate records; ends up grouping by key (age) B = FOREACH A GENERATE group, COUNT($0) --> Map: identity, output key = age; Reduce: compute set size; for each key (age), gets counts C = FILTER B by f(age) --> Map: each record maps to itself if f(age) = true else remove, output key irrelevant; Reduce: identity Total: 3 sequential MapReduce computations. Optimizations: Program rewriting --> leverages techniques from SQL query optimization MapReduce specific optimizations Example: Optimizations work in the previous example. Step 1 (SQL-level optimization): In any sequence of commands, early filtering is /likely/ better since it reduces the number of records that later operations process. (This has nothing to do with MapReduce.) Z = FILTER T by f(age) = true A = GROUP Z by age B = FOREACH A GENERATE group, COUNT($0) How does this run on MapReduce? Z = FILTER T by f(age) = true --> Map: each record maps to itself if f(age) = true else remove, output key irrelevant; Reduce: identity A = GROUP Z by age --> Map: identity function, output key = age; Reduce: concatenate records; ends up grouping by key (age) B = FOREACH A GENERATE group, COUNT($0) --> Map: identity, output key = age; Reduce: compute set size; for each key (age), gets counts Step 2 (MapReduce-specific): Collapse consecutive map/reduce stages if possible. Here, the first map-reduce-map collapse, and the last reduce-map-reduce collapse. Z = FILTER T by f(age) = true A = GROUP Z by age B = FOREACH A GENERATE group, COUNT($0) --> Map: each record maps to itself if f(age) = true else remove, output key = age; Reduce: Count for each key. Just a single MapReduce! If Step 1 is omitted, we still get a single MapReduce in step 2, but the shuffle (sorting on keys) might sort more. A = GROUP T by age B = FOREACH A GENERATE group, COUNT($0) C = FILTER B by f(age) = true --> Map: identity, output key = age; Reduce: Count for a key if f(key) = true, else skip key. Here the map -> reduce phase will communicate and sort all records, not just filtered ones => more work and more communication.