diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java b/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java index 1dc560a60..13fdd7182 100644 --- a/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java +++ b/hutool-core/src/main/java/cn/hutool/core/stream/FastStream.java @@ -6,6 +6,7 @@ import cn.hutool.core.lang.Opt; import cn.hutool.core.lang.mutable.MutableInt; import cn.hutool.core.lang.mutable.MutableObj; import cn.hutool.core.map.MapUtil; +import cn.hutool.core.stream.support.StreamHelper; import cn.hutool.core.text.StrUtil; import cn.hutool.core.util.ArrayUtil; @@ -1422,8 +1423,8 @@ public class FastStream implements Stream, Iterable { *

与 jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

*
本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
 	 * FastStream.iterate(1, i -> i + 1)
-	 * 	.parallel()
-	 * 	// 顺序执行
+	 *	.parallel()
+	 *	// 顺序执行
 	 * 	.takeWhile(e -> e < 50)
 	 * 	// 并发
 	 * 	.map(e -> e + 1)
@@ -1458,8 +1459,8 @@ public class FastStream implements Stream, Iterable {
 	 * 

与 jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

*
本环节中是顺序执行的, 但是后续操作可以支持并行流: {@code
 	 * FastStream.iterate(1, i <= 100, i -> i + 1)
-	 * 	.parallel()
-	 * 	// 顺序执行
+	 *	.parallel()
+	 *	// 顺序执行
 	 * 	.dropWhile(e -> e < 50)
 	 * 	// 并发
 	 * 	.map(e -> e + 1)
diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/support/DropWhileSpliterator.java b/hutool-core/src/main/java/cn/hutool/core/stream/support/DropWhileSpliterator.java
new file mode 100644
index 000000000..0ab4433a9
--- /dev/null
+++ b/hutool-core/src/main/java/cn/hutool/core/stream/support/DropWhileSpliterator.java
@@ -0,0 +1,72 @@
+package cn.hutool.core.stream.support;
+
+import java.util.Comparator;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+
+/**
+ * dropWhile 的 Spliterator
+ * 

借鉴自StreamEx

+ * + * @author emptypoint + * @since 6.0.0 + */ +class DropWhileSpliterator implements Spliterator { + + static DropWhileSpliterator create(Spliterator source, Predicate predicate) { + return new DropWhileSpliterator<>(source, predicate); + } + + private final Spliterator source; + private final Predicate predicate; + private boolean isFound = false; + + private DropWhileSpliterator(Spliterator source, Predicate predicate) { + this.source = source; + this.predicate = predicate; + } + + @Override + public boolean tryAdvance(Consumer action) { + boolean hasNext = true; + // 如果 还没找到 并且 流中还有元素 继续找 + while (!isFound && hasNext) { + hasNext = source.tryAdvance(e -> { + if (!predicate.test(e)) { + // 第一次不匹配 + isFound = true; + action.accept(e); + } + }); + } + + // 对找到的元素进行后续处理 + if (isFound) { + source.forEachRemaining(action); + } + + // 该环节已经处理完成 + return false; + } + + @Override + public Spliterator trySplit() { + return null; + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return source.characteristics() & ~Spliterator.SIZED; + } + + @Override + public Comparator getComparator() { + return source.getComparator(); + } +} diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/support/IterateSpliterator.java b/hutool-core/src/main/java/cn/hutool/core/stream/support/IterateSpliterator.java new file mode 100644 index 000000000..71df09b7b --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/support/IterateSpliterator.java @@ -0,0 +1,76 @@ +package cn.hutool.core.stream.support; + +import java.util.Objects; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; + +/** + * 无限有序流 的Spliterator + * + * @author VampireAchao + * @since 6.0.0 + */ +class IterateSpliterator extends Spliterators.AbstractSpliterator { + public static IterateSpliterator create(T seed, Predicate hasNext, UnaryOperator next) { + return new IterateSpliterator<>(seed, hasNext, next); + } + + /** + * Creates a spliterator reporting the given estimated size and + * additionalCharacteristics. + */ + IterateSpliterator(T seed, Predicate hasNext, UnaryOperator next) { + super(Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.IMMUTABLE); + this.seed = seed; + this.hasNext = hasNext; + this.next = next; + } + + private final T seed; + private final Predicate hasNext; + private final UnaryOperator next; + private T prev; + private boolean started; + private boolean finished; + + @Override + public boolean tryAdvance(Consumer action) { + Objects.requireNonNull(action); + if (finished) { + return false; + } + T t; + if (started) { + t = next.apply(prev); + } else { + t = seed; + started = true; + } + if (!hasNext.test(t)) { + prev = null; + finished = true; + return false; + } + prev = t; + action.accept(prev); + return true; + } + + @Override + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + if (finished) { + return; + } + finished = true; + T t = started ? next.apply(prev) : seed; + prev = null; + while (hasNext.test(t)) { + action.accept(t); + t = next.apply(t); + } + } +} diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/support/StreamHelper.java b/hutool-core/src/main/java/cn/hutool/core/stream/support/StreamHelper.java new file mode 100644 index 000000000..478cbb091 --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/support/StreamHelper.java @@ -0,0 +1,99 @@ +package cn.hutool.core.stream.support; + +import java.util.Spliterator; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.Objects.requireNonNull; + +/** + * FastStream 辅助工具类 + * + * @author emptypoint + * @since 6.0.0 + */ +public final class StreamHelper { + private StreamHelper() { + } + + /** + * 返回无限有序流 + * 该流由 初始值 然后判断条件 以及执行 迭代函数 进行迭代获取到元素 + * + * @param 元素类型 + * @param seed 初始值 + * @param hasNext 条件值 + * @param next 用上一个元素作为参数执行并返回一个新的元素 + * @return 无限有序流 + */ + public static Stream iterate(T seed, Predicate hasNext, UnaryOperator next) { + requireNonNull(next); + requireNonNull(hasNext); + return StreamSupport.stream(IterateSpliterator.create(seed, hasNext, next), false); + } + + /** + * 保留 与指定断言 匹配时的元素, 在第一次不匹配时终止, 抛弃当前(第一个不匹配元素)及后续所有元素 + *

与 jdk9 中的 takeWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

+ *

本环节中是顺序执行的, 但是后续操作可以支持并行流

+ *

但是不建议在并行流中使用, 除非你确定 takeWhile 之后的操作能在并行流中受益很多

+ * + * @param source 源流 + * @param 元素类型 + * @param predicate 断言 + * @return 与指定断言匹配的元素组成的流 + */ + public static Stream takeWhile(Stream source, Predicate predicate) { + requireNonNull(source); + requireNonNull(predicate); + return createStatefulNewStream(source, TakeWhileSpliterator.create(source.spliterator(), predicate)); + } + + /** + * 删除 与指定断言 匹配的元素, 在第一次不匹配时终止, 返回当前(第一个不匹配元素)及剩余元素组成的新流 + *

与 jdk9 中的 dropWhile 方法不太一样, 这里的实现是个 顺序的、有状态的中间操作

+ *

本环节中是顺序执行的, 但是后续操作可以支持并行流

+ *

但是不建议在并行流中使用, 除非你确定 dropWhile 之后的操作能在并行流中受益很多

+ * + * @param source 源流 + * @param 元素类型 + * @param predicate 断言 + * @return 剩余元素组成的流 + */ + public static Stream dropWhile(Stream source, Predicate predicate) { + requireNonNull(source); + requireNonNull(predicate); + return createStatefulNewStream(source, DropWhileSpliterator.create(source.spliterator(), predicate)); + } + + // region 私有方法 + /* ================================================== 私有方法 =================================================== */ + + /** + * 根据 源流 和 新的Spliterator 生成新的流 + *

这是一个 顺序的、有状态的流

+ *

在新流的第一个节点是顺序执行的, 但是后续操作可以支持并行流

+ * + * @param source 源流 + * @param newSpliterator 新流的Spliterator + * @param 旧流的元素类型 + * @param 新流的元素类型 + * @return 新流 + */ + private static Stream createStatefulNewStream(Stream source, Spliterator newSpliterator) { + // 创建新流 + Stream newStream = StreamSupport.stream(newSpliterator, source.isParallel()); + // 如果旧流是并行流, 新流主动调用一个有状态的操作, 虽然没有意义, 但是可以让后续的无状态节点正常并发 + if (source.isParallel()) { + newStream = newStream.limit(Long.MAX_VALUE); + } + // 由于新流不与旧流的节点关联, 所以需要主动设置旧流的close方法, 哪怕几乎不可能有人在旧流上设置onClose操作 + return newStream.onClose(source::close); + } + + /* ============================================================================================================== */ + // endregion + +} diff --git a/hutool-core/src/main/java/cn/hutool/core/stream/support/TakeWhileSpliterator.java b/hutool-core/src/main/java/cn/hutool/core/stream/support/TakeWhileSpliterator.java new file mode 100644 index 000000000..dc0ca8da6 --- /dev/null +++ b/hutool-core/src/main/java/cn/hutool/core/stream/support/TakeWhileSpliterator.java @@ -0,0 +1,68 @@ +package cn.hutool.core.stream.support; + +import java.util.Comparator; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Predicate; + +/** + * takeWhile 的 Spliterator + *

借鉴自StreamEx

+ * + * @author emptypoint + * @since 6.0.0 + */ +class TakeWhileSpliterator implements Spliterator { + + static TakeWhileSpliterator create(Spliterator source, Predicate predicate) { + return new TakeWhileSpliterator<>(source, predicate); + } + + private final Spliterator source; + private final Predicate predicate; + private boolean isContinue = true; + + TakeWhileSpliterator(Spliterator source, Predicate predicate) { + this.source = source; + this.predicate = predicate; + } + + @Override + public boolean tryAdvance(Consumer action) { + boolean hasNext = true; + // 如果 还可以继续 并且 流中还有元素 则继续遍历 + while (isContinue && hasNext) { + hasNext = source.tryAdvance(e -> { + if (predicate.test(e)) { + action.accept(e); + } else { + // 终止遍历剩下的元素 + isContinue = false; + } + }); + } + // 该环节已经处理完成 + return false; + } + + @Override + public Spliterator trySplit() { + return null; + } + + @Override + public long estimateSize() { + return isContinue ? source.estimateSize() : 0; + } + + @Override + public int characteristics() { + return source.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); + } + + @Override + public Comparator getComparator() { + return source.getComparator(); + } +} +