Google Cloud Platform (GCP) Cloud Dataflow Product Overview
Google Cloud Platform (GCP)
A fully managed data processing service, built on Apache Beam
GCP Cloud Dataflow Doco
Can export Dataprep to a Dataflow template
Apache Beam
A portable data processing programming model
Features
Open source
A UNIFIED programming model for processing both batch and streaming data
Pipelines are PORTABLE - can be executed on multiple execution environments
EXTENSIBLE - new SDKs, IO connectors and transformation libraries can be created and shared
Concepts
Pipelines - a set of steps/transforms; starts with a read from a source; ends with a write to a sink
Languages supported - Java, Python or Go
Step / Transform - elastically scaled; applied on a PCollection, resulting in another PCollection; give each transform a unique name within pipeline - can be reused and appears in the UI console step
In Java - pipeline.apply('step name', TextIO.Read.from("gs://.../input-*.csv.gz").apply(new TransformClass()).apply(TextIO.Write.to("gs://..."); p.run();
In Python - (pipeline | 'step name' >> beam.io.ReadFromText('gs://.../input-*.csv.gz') | beam.FlatMap(lambda line: myfunction(line)) | beam.io.WriteToText('gs://...') ) p.run()
PCollection - an unbounded parallel collection / list; not just in-memory
Sources
Sinks
SDK - provides a library of transformations and data connectors to sources and sinks
Model - a portable representation that is created from your code
Runner - passes off your model to an execution environment, to be run by an engine
Engine - e.g. Cloud Dataflow
Workers
Features
No-ops - little maintenance overhead
Build on reliable Google infrastructure
Auto-scaling of workers
Integrated with Stackdriver for monitoring and alerting
No lock-in because Apache Beam can be run elsewhere
Serverless - managed resource provisioning, performance tuning, pipeline reliability
Auto-heal in the event of faults with workers
Re-balance automatically to best utilise workers
Dataflow Templates
Can run pipeline locally or on cloud - in Python or Java
To associate a timestamp with inputs, it automatically uses PubSub message publishing timestamp ; otherwise you need to use c.outputWithTimestamp(...)
Ingesting data into a pipeline - from wildcarded files in file system, Cloud Storage, BigQuery (returns a TableRow), Pub/Sub
Writing data out - to file system, Cloud Storage, BigQuery, Pub/Sub
Can prevent sharding with .withoutSharding() - but not recommended - forces writing on a single machine; ok for very small file scenario
Actions
Step 1 - Receive a job
Step 2 - optimise the execution graph of the model to remove inefficiencies
Step 3 - schedule and distributes work to workers, scaling as necessary
MapReduce
Tip: Prefer Combine over GroupByKey as it knows the commutative and associative nature and can use multiple workers whereas GroupByKey has only one worker per key
Approach to streaming
Extract data
Apply a time window
Group by or Combine
Apply aggregate function
Windowing - group by time with c.outputWithTimestamp() instead of c.output() and:
Fixed Windows: .apply("window", Window.into(FixedWindows.of(Minutes(2)))
Sliding Windows: .apply("window", Window.into(SlidingWindows // .of(Duration.standardMinutes(2)) // .every(Duration.standardSeconds(20))))
In Python
Use beam.Map( lambda word: (word, len(word)) ) for 1:1 relationships between input and output - and return a tuple
Use beam.FlatMap( lambda word: my_generator(line, searchTerm) ) for non 1:1 relationships between input and output, typically with a generator that 'yield's a returning value
GroupBy and Combine
Combine - to aggregate - Sum, Mean, etc
GroupByKey a tuple # grouped = cityAndZipCodes | beam.GroupByKey()
salesAmounts | Combine.globally(sum) to combine items in a collection and do an aggregation/sum
salesRecords | Combine.perKey(sum) to combine tuples by the first item in the tuple and do an aggregation/sum
In Java
"Parallel Do" ParDo acts on one item at a time - like a Map operation
Multiple instances of the class on many machines
Should NOT contain state
Good for
Filtering (choosing which inputs to emit)
Converting one Java type to another
Extracting parts of an input (e.g. fields of TableRow)
Calculating values from different parts of input
Side inputs - more than 1 collection to process - czmap = p.apply("toview", View.asMap()); ... .apply("..", ParDo.withSideInputs(czmap))
GroupBy and Combine
Create a key-value pair KV class (in parallel on each machine with ParDo), then do an apply(GroupByKey.<String, type>create()) to create a PCollection<String, Iterable<type>>
Combine - to aggregate - Sum, Mean, etc
Combine.globally(new Sum.SumDoubleFn()) to aggregate/sum all the PCollection<Double> items
Combine.<String, Double, Double>perKey(new Sum.SumDoubleFn()) to aggregate/sum per key
Exactly-Once - using your own message identifier - as opposed to the internal message id
Specify unique id when publishing to Pub/Sub: msg.publish(data, myid="1234") // or // p.apply(PubsubIO.Write(topic).idLabel("myid"))
Tell Dataflow the idLabel: p.apply(PubsubIO.readStrings().fromTopic(topic).idLabel("myid"))
Constructs & questions for out-of-order data processing pipelines
What results are calculated? Transformations
Where in event time are results calculated? Event-time windowing
When in processing time are results materialised? Watermarks, triggers and allowed lateness
How do refinements of results relate? Accumulation modes
Concepts
Time
Event Time - related to the source of the event
Processing Time - related to the engine processing the event message
System Lag Dataflow Metric - the current maximum duration that an item of data has been awaiting processing, in seconds; the amount of time data elements are waiting to be processed since they "arrived" in the input of the transformation step
Elements Added Dataflow Metric (under output collections) - how many data elements exited this step
Elements Added Dataflow Metric (for the Read PubSub Msg step of the pipeline) - the number of Pub/Sub messages read from the topic by the Pub/Sub IO connector
Watermark - a heuristic of completeness, computed from the processing stream - the skew - tracks how far behind the processing engine is from the ideal of instantly processing each message
Data watermark age - the age (time since event timestamp) of the most recent item of data that has been fully processed by the pipeline
GCP Metrics doco
Steaming Data - balancing tradeoffs
Windowing model - supports unaligned event time windows: fixed, sliding, sessions
Triggering model - associates the output times of results to a runtime characterisitc of the pipeline
Incremental processing model - integrates retractions and updates intot he windows and triggering models