跳到主要內容

Streams (大數據分析必要工具)

  


頻道需要你的支持,謝謝你成為我們的會員(加入會員)

影片1 Streams API 的重要性與基本觀念介紹



影片2 forEach, filter



影片3 map, peek



影片4 Search methods 與 Optional<T>



影片5 Data methods 與 Comparator<T> 



影片6 Sort 與 Comparator Updates



影片7 toList, toSet 與 toMap





影片8 averaging, summing 與 groupingBy



影片9 partitioningBy,  joining,  Parallel Stream 與  Reduction



範例


一、          
java.util.stream

Streams Java SE 8 中新增的一群新型態。 Collections 相似,是集合一群元素的序列。但是,有不同的目標:Collections 的重點是對元素進行有效的管理與存取;Streams 著重在對元素進行運算。對於大量資料的歸納與分析,有相當顯著的效能與實用性的提升。

 

二、           Method Chaining

Streams 支援 sequential parallel 操作其元素。這些操作可以在單一敍述中連續呼叫。此功能稱為 “Method Chaining”

 

三、           Stream 特性。

l   它們是不可變動的。

l   元素被使用後,不可再使用。

l   操作鏈上的每一個 Stream 只能運算一次。

l   它們預設是依序 (sequential) 的,但也可以是並行 (parallel) 的。

l   Java API 文件提供了所有 Stream 方法的詳細資訊。

 

四、           運算分類

Streams 中運算的方法分為:

l   中繼

n   filter(), map(), peek()

l   終結

n   forEach(), count(), sum(), average(), min(), max(), collect()

l   短程終結

n   findFirst(), findAny(), anyMatch(), allMatch(), noneMatch()

 

五、           Stream Pipeline

Streams Pipeline 由來源、零個或多個中繼操作和終結操作組成:

l   來源:陣列、集合、生成器函數、I/O 通道

l   零個或多個中繼操作:將一個 Stream 轉換為另一個 Stream ,例如 filter

l   終結操作:例如 count forEach

 

Stream 可能是惰性的。僅當終結操作啟動時才對來源資料進行運算,並且僅在需要時才取用來源元素。

 

在許多情況下,Method Reference 可以取代 Lambda Expression 如果 Lambda Expression僅呼叫每一個元素物件的方法或以每一個元素物件為參數呼叫某方法,可用 Method Reference 取代。

 

六、           The forEach method

void forEach​(Consumer<? super T> action)

讓所有元素執行指定的運算。

 

七、           The filter Method

Stream<T> filter(Predicate<T> p)

用指定的條件過濾元素符合條件的元素會新增至 return Stream 物件之中。

 

八、           Extracting Data with Map

Stream<R> map(Function<? super T, ? extends R> mapper)

每一個元素依照 Function 指定的運算方式運算出結果。並將結果新增至 return Stream 物件之中。

 

map 的基本資料型別版本:

有三個方法:mapToInt, mapToLong, mapToDouble。若 Function 要運算的結果是 int, long, double,可以呼叫這些方法。它們 return IntStream, LongStream, DoubleStream 可以支援該型態資料特定的運算。

 

九、           Taking a peek

Stream<T> peek(Consumer<? super T> action)

讓所有元素執行指定的運算。運算後,元素會新增至 return Stream 物件之中。常用於列印資料。

 

十、           Search Methods

l   Optional<T> findFirst()

Stream 中的第一個元素包裝在 Optional 物件中,並 return Optional 物件。

l   boolean   allMatch​(Predicate<? super T> predicate)

如果所有元素都滿足條件,則返回 true

l   boolean   noneMatch​(Predicate<? super T> predicate)

如果沒有任何元素滿足條件,則返回 true

 

十一、                Optional Class

Optional<T>

l   java.util package 中。

l   可能包含也可能不包含 None-Null 的容器物件。

l   boolean   isPresent()

如果包含物件,返回 true

l   T get()

返回包含的物件。

l   T orElse​(T other)

如果包含物件 return 該件,否則 return other

l   許多其他可用的方法,包括 stream 在必要時返回新的 Stream object

l   Optional 的基本資料型別版本。

n   OptionalDouble, OptionalInt, OptionalLong

十二、                Nondeterministic search methods

執行結果具有不確定性的搜尋方法。實際上在並行 (parallel) 的狀況時更有效率。

l   Optional<T>    findAny()

n   Stream 中的任何一個元素包裝在 Optional 物件中,並 return Optional 物件。。

n   並行執行時,結果可能會有所不同。

l   boolean   anyMatch​(Predicate<? super T> predicate)

n   如果有任何一個元素滿足指定條件返回 true

n   並行執行時,結果可能會有所不同。

 

十三、                Stream Data Methods

l   long count()

Stream中元素的數量。

l   Optional<T> max​(Comparator<? super T> comparator)

Stream中的元素依照comparator指定的方式,找到最大值物件。將其包裝在 Optional 物件之中,再 return Optional 物件。

l   Optional<T> min​(Comparator<? super T> comparator)

Stream中的元素依照comparator指定的方式,找到最小值物件。將其包裝在 Optional 物件之中,再 return Optional 物件。

 

十四、                Performing Calculations

DoubleStream, IntStream, LongStream 提供的協助運算方法:

l   OptionalDouble average()

運算 Stream 物件中所有元素的平均值。並將其包裝在 OptionalDouble 物件之中。再返回該OptionalDouble物件。如果 Stream 是空的,會返回一個 Empty OptionalDouble 物件。

l   int/long/double sum()

運算Stream物件中所有元素的總和。依照該Stream的形態運算出不同的形態的值。

 

十五、                Sorting

l   Stream<T> sorted()

返回按自然順序排序的元素組成的 Stream

l   Stream<T> sorted(Comparator<? super T> comparator)

返回根據 Comparator 排序的元素組成的 Stream

 

十六、                Comparator Updates

l   static Comparator<T> comparing​(Function<? super T,​? extends U>  keyExtractor)

n   允許你透過 method reference lambda 指定任何欄位排序。

n   支援基本資料型別的 Stream

l   default Comparator<T>  thenComparing​(Comparator<? super T> other)

n   指定要排序的其他欄位。

l   default Comparator<T>  reversed()

Stream 中的元素反向排列。

l   static Comparator<T>     reverseOrder()

Stream 中的元素反向排列。

 

十七、                Saving Data from a Stream

l   R collect​(Collector<? super T,​A,​R> collector)

允許您將 Stream操作的結果儲存到新的資料結構。

l   Collectors 類別中提供了許多有用的 collectors

n   stream().collect(Collectors.toList());

n   stream().collect(Collectors.toSet());

n   stream().collect(Collectors.toMap() );

 

十八、                Other methods in Collectors Class

l   static Collector<T,​?,​Double> averagingDouble​(ToDoubleFunction<? super T> mapper)

l   static Collector<T,​?,​Double> averagingInt​(ToIntFunction<? super T> mapper)

l   static Collector<T,​?,​Double> averagingLong​(ToLongFunction<? super T> mapper)

產生算術平均值

l   static <T> Collector<T,​?,​Double> summingDouble​(ToDoubleFunction<? super T> mapper)

l   static <T> Collector<T,​?,​Integer> summingInt​(ToIntFunction<? super T> mapper)

l   static <T> Collector<T,​?,​Long> summingLong​(ToLongFunction<? super T> mapper)

產生總和

l   static Collector<T,​?,​Map<K,​List<T>>> groupingBy​(Function<? super T,​? extends K> classifier)

Function 指定的欄位為Key將所有元素進行分組儲存在 List 之中。返回的是一個以Function 指定的欄位為KeyList value Map 物件。

l   static <T,​K,​A,​D> Collector<T,​?,​Map<K,​D>> groupingBy​(Function<? super T,​? extends K> classifier, Collector<? super T,​A,​D> downstream)

Function 指定的欄位為Key。返回的是一個以Collector 指定的值為value Map 物件。

l   static Collector<T,​?,​Map<Boolean,​List<T>>> partitioningBy​(Predicate<? super T> predicate)

依據 Predicate 對元素進行分組儲存在 List 之中。返回以 true / false KeyList value Map 物件。

l   static Collector<CharSequence,​?,​String> joining()

按順序將輸入元素連接成字串。

 

十九、                Quick Streams with Stream.of

Stream.of 方法允許您輕鬆創建 Stream

 

二十、                Flatten Data with flatMap

使用 flatMap 方法合併多個集合中的資料。

 

二十一、      Parallel Stream

l   可以提供更好的性能

擁有多晶片和核心的機器。

l   影響性能的因素很多

Parallel 並不總是更快。

l   Using Parallel Streams

n   Collection 開始呼叫。

.parallelStream

n   Stream 敍述串列中呼叫。

.parallel .sequential(預設為 sequential)

l   呼叫適用於整個 pipeline

l   以最後一次呼叫為主。

 

二十二、      Use Stream pipeline

要讓  sequentially parallel 正確操作。最好完全不要與其它物件產生關連

l   不要在查詢期間修改資料來源。

l   操作的目標必須是無狀態的(stateless) / 不會變動。

l   不要訪問任何可能更改的狀態(state that might change) 變動的物件。

l   Streams Are Deterministic for Most Part

確定性演算法是一種演算法,給定特定輸入,將始終產生相同的輸出。sum 是一個很好的例子,因為元素組合的順序無關緊要。無論添加元素的順序如何,結果都將相同。

l   Some Are Not Deterministic

資料集合越大,兩個程式碼區塊產生不同結果的可能性就越大。 parallel stream 不按順序搜索資料。因此,它可能會首先找到符合標準的不同元素。

 

二十三、      Reduction

一種操作,它將 Stream 中的元素,通過重複的組合運算,將它們組合成一個匯總結果。與 collect 相比,使用上較為簡單。

l   使用 reduce(Function) 方法實現。

l   如果組合函數是關聯函數 reduction 可以 parallelizes

關聯意味著順序無關緊要。無論用於組合的元素順序如何結果都是相同的。

n   Examples of : sum, min, max, count

n   警告:如果你傳遞一個非關聯函數來 reduction ,你會得到錯誤的答案。該函數必須是關聯的。