Add support for Kafka Streams

This commit adds a new "Kafka Streams" entry. When selected with Spring
Cloud Stream, this automatically adds the kafka streams binder.

Closes gh-642
This commit is contained in:
Stephane Nicoll 2018-04-10 08:29:09 +02:00
parent 8a0453ecdb
commit b5be653f7c
3 changed files with 97 additions and 35 deletions

View File

@ -24,21 +24,25 @@ import org.springframework.stereotype.Component;
/** /**
* Determine the appropriate Spring Cloud stream dependency to use based on the * Determine the appropriate Spring Cloud stream dependency to use based on the
* selected messaging technology. * selected integration technology.
* <p> * <p>
* Does not replace the messaging technology jar by the relevant binder. If more than * Does not replace the integration technology jar by the relevant binder. If more than
* one tech is selected, it is far more easier to remove the unnecessary binder jar than * one tech is selected, it is far more easier to remove the unnecessary binder jar than
* to figure out the name of the tech jar to add to keep support for that technology. * to figure out the name of the tech jar to add to keep support for that technology.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
*/ */
@Component @Component
class SpringCloudMessagingRequestPostProcessor class SpringCloudStreamRequestPostProcessor
extends AbstractProjectRequestPostProcessor { extends AbstractProjectRequestPostProcessor {
static final Dependency KAFKA_BINDER = Dependency.withId("cloud-stream-binder-kafka", static final Dependency KAFKA_BINDER = Dependency.withId("cloud-stream-binder-kafka",
"org.springframework.cloud", "spring-cloud-stream-binder-kafka"); "org.springframework.cloud", "spring-cloud-stream-binder-kafka");
static final Dependency KAFKA_STREAMS_BINDER = Dependency.withId(
"cloud-stream-binder-kafka-streams", "org.springframework.cloud",
"spring-cloud-stream-binder-kafka-streams");
static final Dependency RABBIT_BINDER = Dependency.withId( static final Dependency RABBIT_BINDER = Dependency.withId(
"cloud-stream-binder-rabbit", "org.springframework.cloud", "cloud-stream-binder-rabbit", "org.springframework.cloud",
"spring-cloud-stream-binder-rabbit"); "spring-cloud-stream-binder-rabbit");
@ -67,6 +71,9 @@ class SpringCloudMessagingRequestPostProcessor
} }
// Spring Cloud Stream specific // Spring Cloud Stream specific
if (hasSpringCloudStream || hasReactiveSpringCloudStream) { if (hasSpringCloudStream || hasReactiveSpringCloudStream) {
if (hasDependencies(request, "kafka-streams")) {
request.getResolvedDependencies().add(KAFKA_STREAMS_BINDER);
}
request.getResolvedDependencies().add(SCS_TEST); request.getResolvedDependencies().add(SCS_TEST);
} }
} }

View File

@ -700,8 +700,18 @@ initializr:
description: Accessing Data with GemFire description: Accessing Data with GemFire
- rel: reference - rel: reference
href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#boot-features-gemfire href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#boot-features-gemfire
- name: Messaging - name: Integration
content: content:
- name: Spring Integration
id: integration
description: Common spring-integration modules
weight: 100
links:
- rel: guide
href: https://spring.io/guides/gs/integration/
description: Integrating Data
- rel: reference
href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#boot-features-integration
- name: RabbitMQ - name: RabbitMQ
id: amqp id: amqp
description: Advanced Message Queuing Protocol via spring-rabbit description: Advanced Message Queuing Protocol via spring-rabbit
@ -727,6 +737,25 @@ initializr:
links: links:
- rel: reference - rel: reference
href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#boot-features-kafka href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#boot-features-kafka
- name: Kafka Streams
id: kafka-streams
weight: 90
description: Support for building stream processing applications with Apache Kafka Streams
versionRange: 2.0.0.RELEASE
groupId: org.apache.kafka
artifactId: kafka-streams
version: 1.0.1
starter: false
links:
- rel: guide
href: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples
description: Samples for using Kafka Streams with Spring Cloud stream
- rel: reference
href: https://docs.spring.io/spring-kafka/docs/current/reference/html/_reference.html#kafka-streams
description: Kafka Streams Support in Spring Kafka
- rel: reference
href: https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_kafka_streams_binding_capabilities_of_spring_cloud_stream
description: Kafka Streams Binding Capabilities of Spring Cloud Stream
- name: JMS (ActiveMQ) - name: JMS (ActiveMQ)
id: activemq id: activemq
description: Java Message Service API via Apache ActiveMQ description: Java Message Service API via Apache ActiveMQ
@ -1115,16 +1144,6 @@ initializr:
description: Creating a Batch Service description: Creating a Batch Service
- rel: reference - rel: reference
href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#howto-batch-applications href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#howto-batch-applications
- name: Integration
id: integration
description: Common spring-integration modules
weight: 100
links:
- rel: guide
href: https://spring.io/guides/gs/integration/
description: Integrating Data
- rel: reference
href: http://docs.spring.io/spring-boot/docs/{bootVersion}/reference/htmlsingle/#boot-features-integration
- name: Mail - name: Mail
id: mail id: mail
description: javax.mail description: javax.mail

View File

@ -19,16 +19,17 @@ package io.spring.initializr.service.extension;
import io.spring.initializr.generator.ProjectRequest; import io.spring.initializr.generator.ProjectRequest;
import org.junit.Test; import org.junit.Test;
import static io.spring.initializr.service.extension.SpringCloudMessagingRequestPostProcessor.KAFKA_BINDER; import static io.spring.initializr.service.extension.SpringCloudStreamRequestPostProcessor.KAFKA_BINDER;
import static io.spring.initializr.service.extension.SpringCloudMessagingRequestPostProcessor.RABBIT_BINDER; import static io.spring.initializr.service.extension.SpringCloudStreamRequestPostProcessor.KAFKA_STREAMS_BINDER;
import static io.spring.initializr.service.extension.SpringCloudMessagingRequestPostProcessor.SCS_TEST; import static io.spring.initializr.service.extension.SpringCloudStreamRequestPostProcessor.RABBIT_BINDER;
import static io.spring.initializr.service.extension.SpringCloudStreamRequestPostProcessor.SCS_TEST;
/** /**
* Tests for {@link SpringCloudMessagingRequestPostProcessor}. * Tests for {@link SpringCloudStreamRequestPostProcessor}.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
*/ */
public class SpringCloudMessagingRequestPostProcessorTests public class SpringCloudStreamRequestPostProcessorTests
extends AbstractRequestPostProcessorTests { extends AbstractRequestPostProcessorTests {
@Test @Test
@ -55,24 +56,40 @@ public class SpringCloudMessagingRequestPostProcessorTests
.hasDependenciesCount(5); .hasDependenciesCount(5);
} }
@Test
public void springCloudStreamWithKafkaStreams() {
ProjectRequest request = createProjectRequest("cloud-stream", "kafka-streams");
request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request)
.hasDependency(getDependency("cloud-stream"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(KAFKA_STREAMS_BINDER)
.hasSpringBootStarterTest()
.hasDependency(SCS_TEST)
.hasDependenciesCount(5);
}
@Test @Test
public void springCloudStreamWithAllBinders() { public void springCloudStreamWithAllBinders() {
ProjectRequest request = createProjectRequest("cloud-stream", "amqp", "kafka"); ProjectRequest request = createProjectRequest(
"cloud-stream", "amqp", "kafka", "kafka-streams");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("cloud-stream")) .hasDependency(getDependency("cloud-stream"))
.hasDependency(getDependency("amqp")) .hasDependency(getDependency("amqp"))
.hasDependency(getDependency("kafka")) .hasDependency(getDependency("kafka"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(RABBIT_BINDER) .hasDependency(RABBIT_BINDER)
.hasDependency(KAFKA_BINDER) .hasDependency(KAFKA_BINDER)
.hasDependency(KAFKA_STREAMS_BINDER)
.hasSpringBootStarterTest() .hasSpringBootStarterTest()
.hasDependency(SCS_TEST) .hasDependency(SCS_TEST)
.hasDependenciesCount(7); .hasDependenciesCount(9);
} }
@Test @Test
public void reactiveSpringCloudStreamWithRabbit() { public void reactiveSpringCloudStreamWithRabbit() {
ProjectRequest request = createProjectRequest("reactive-cloud-stream", "amqp"); ProjectRequest request = createProjectRequest("reactive-cloud-stream", "amqp");
request.setBootVersion("2.0.0.BUILD-SNAPSHOT"); request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("reactive-cloud-stream")) .hasDependency(getDependency("reactive-cloud-stream"))
.hasDependency(getDependency("amqp")) .hasDependency(getDependency("amqp"))
@ -85,7 +102,7 @@ public class SpringCloudMessagingRequestPostProcessorTests
@Test @Test
public void reactiveSpringCloudStreamWithKafka() { public void reactiveSpringCloudStreamWithKafka() {
ProjectRequest request = createProjectRequest("reactive-cloud-stream", "kafka"); ProjectRequest request = createProjectRequest("reactive-cloud-stream", "kafka");
request.setBootVersion("2.0.0.BUILD-SNAPSHOT"); request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("reactive-cloud-stream")) .hasDependency(getDependency("reactive-cloud-stream"))
.hasDependency(getDependency("kafka")) .hasDependency(getDependency("kafka"))
@ -95,20 +112,36 @@ public class SpringCloudMessagingRequestPostProcessorTests
.hasDependenciesCount(5); .hasDependenciesCount(5);
} }
@Test
public void reactiveSpringCloudStreamWithKafkaStreams() {
ProjectRequest request = createProjectRequest(
"reactive-cloud-stream", "kafka-streams");
request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request)
.hasDependency(getDependency("reactive-cloud-stream"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(KAFKA_STREAMS_BINDER)
.hasSpringBootStarterTest()
.hasDependency(SCS_TEST)
.hasDependenciesCount(5);
}
@Test @Test
public void reactiveSpringCloudStreamWithAllBinders() { public void reactiveSpringCloudStreamWithAllBinders() {
ProjectRequest request = createProjectRequest("reactive-cloud-stream", "amqp", ProjectRequest request = createProjectRequest(
"kafka"); "reactive-cloud-stream", "amqp", "kafka","kafka-streams");
request.setBootVersion("2.0.0.BUILD-SNAPSHOT"); request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("reactive-cloud-stream")) .hasDependency(getDependency("reactive-cloud-stream"))
.hasDependency(getDependency("amqp")) .hasDependency(getDependency("amqp"))
.hasDependency(getDependency("kafka")) .hasDependency(getDependency("kafka"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(RABBIT_BINDER) .hasDependency(RABBIT_BINDER)
.hasDependency(KAFKA_BINDER) .hasDependency(KAFKA_BINDER)
.hasDependency(KAFKA_STREAMS_BINDER)
.hasSpringBootStarterTest() .hasSpringBootStarterTest()
.hasDependency(SCS_TEST) .hasDependency(SCS_TEST)
.hasDependenciesCount(7); .hasDependenciesCount(9);
} }
@Test @Test
@ -135,21 +168,23 @@ public class SpringCloudMessagingRequestPostProcessorTests
@Test @Test
public void springCloudBusWithAllBinders() { public void springCloudBusWithAllBinders() {
ProjectRequest request = createProjectRequest("cloud-bus", "amqp", "kafka"); ProjectRequest request = createProjectRequest(
"cloud-bus", "amqp", "kafka", "kafka-streams");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("cloud-bus")) .hasDependency(getDependency("cloud-bus"))
.hasDependency(getDependency("amqp")) .hasDependency(getDependency("amqp"))
.hasDependency(getDependency("kafka")) .hasDependency(getDependency("kafka"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(RABBIT_BINDER) .hasDependency(RABBIT_BINDER)
.hasDependency(KAFKA_BINDER) .hasDependency(KAFKA_BINDER)
.hasSpringBootStarterTest() .hasSpringBootStarterTest()
.hasDependenciesCount(6); .hasDependenciesCount(7);
} }
@Test @Test
public void springCloudTurbineStreamWithRabbit() { public void springCloudTurbineStreamWithRabbit() {
ProjectRequest request = createProjectRequest("cloud-turbine-stream", "amqp"); ProjectRequest request = createProjectRequest("cloud-turbine-stream", "amqp");
request.setBootVersion("2.0.0.BUILD-SNAPSHOT"); request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("cloud-turbine-stream")) .hasDependency(getDependency("cloud-turbine-stream"))
.hasDependency(getDependency("amqp")) .hasDependency(getDependency("amqp"))
@ -161,7 +196,7 @@ public class SpringCloudMessagingRequestPostProcessorTests
@Test @Test
public void springCloudTurbineStreamWithKafka() { public void springCloudTurbineStreamWithKafka() {
ProjectRequest request = createProjectRequest("cloud-turbine-stream", "kafka"); ProjectRequest request = createProjectRequest("cloud-turbine-stream", "kafka");
request.setBootVersion("2.0.0.BUILD-SNAPSHOT"); request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("cloud-turbine-stream")) .hasDependency(getDependency("cloud-turbine-stream"))
.hasDependency(getDependency("kafka")) .hasDependency(getDependency("kafka"))
@ -172,17 +207,18 @@ public class SpringCloudMessagingRequestPostProcessorTests
@Test @Test
public void springCloudTurbineStreamWithAllBinders() { public void springCloudTurbineStreamWithAllBinders() {
ProjectRequest request = createProjectRequest("cloud-turbine-stream", "amqp", ProjectRequest request = createProjectRequest(
"kafka"); "cloud-turbine-stream", "amqp", "kafka", "kafka-streams");
request.setBootVersion("2.0.0.BUILD-SNAPSHOT"); request.setBootVersion("2.0.0.RELEASE");
generateMavenPom(request) generateMavenPom(request)
.hasDependency(getDependency("cloud-turbine-stream")) .hasDependency(getDependency("cloud-turbine-stream"))
.hasDependency(getDependency("amqp")) .hasDependency(getDependency("amqp"))
.hasDependency(getDependency("kafka")) .hasDependency(getDependency("kafka"))
.hasDependency(getDependency("kafka-streams"))
.hasDependency(RABBIT_BINDER) .hasDependency(RABBIT_BINDER)
.hasDependency(KAFKA_BINDER) .hasDependency(KAFKA_BINDER)
.hasSpringBootStarterTest() .hasSpringBootStarterTest()
.hasDependenciesCount(6); .hasDependenciesCount(7);
} }
} }