# spring cloud stream 3.2学习记录
spring:
cloud:
stream:
function:
definition: testFunc
bindings:
testFunc-out-0:
destination: aurora-stream-test
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
在配置的时候,无论存在一个bean还是存在多个bean,我们都需要配置function.definition
如果没有生产者没有设置stream.bindings.xxx.destination,但是设置了rabbitmq.bindings.xxx.producer的相关属性的时候,那么交换机的名称就是方法名称
如果我们指定stream.bindings.xxx.destination
,那么这个值就是交换机的名称
其中stream.bindings.xxx.就是信道名称
# streamBridge使用
@Autowired
private StreamBridge streamBridge;
public String test() {
long l = GenerateInfoUtils.generateUid(1, 2);
boolean send = streamBridge.send("provider1-out-0", l);
return l + "发送: " + send;
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
这个send方法有多个重载,其中第一个参数,可以是绑定的名称,也可以是交换机名称(如果spring.cloud.stream.bindings.xxx-out-0.destination
没有设置的时候,会将此名称作为交换机名称)
我们可以在send方法中发送Object类型的数据,如果想要正常使用,我们还需要在配置文件中,加上一些配置,可以通过spring.cloud.stream.bindings.xxx-out-0
和spring.cloud.stream.rabbit.bindings.<channelName>.producer.
进行设置
# topic交换机
@GetMapping("/msg3")
public String test3() {
boolean send = streamBridge.send("topicPro-out-0", "sdfsdf");
return "";
}
1
2
3
4
5
2
3
4
5
spring.cloud.bindings:
topicPro-out-0:
destination: nimade
1
2
3
2
3
消费者,这里设置四个消费者
@Bean
public Consumer<String> consumer1() {
return msg -> {
System.out.println("交换机1号: " + msg);
};
}
@Bean
public Consumer<String> consumer3() {
return msg -> {
System.out.println("交换机3号: " + msg);
};
}
@Bean
public Consumer<String> consumer4() {
return msg -> {
System.out.println("交换机4号: " + msg);
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:
cloud:
stream:
bindings: # 服务的整合处理
consumer1-in-0:
destination: nimade
consumer3-in-0:
destination: nimade
consumer4-in-0:
destination: nimade
function:
definition: consumer1;consumer3;consumer4
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
如果有多个消费者的话,不管是和哪种交换机进行绑定,都一定要设置spring.cloud.function.definition
,值是方法名,如果存在多个,使用;
进行分隔开
# fanout交换机的绑定
生产者
@GetMapping("/msg4")
public String test4() {
streamBridge.send("fanoutPro-out-0",System.currentTimeMillis());
return System.currentTimeMillis() + "";
}
1
2
3
4
5
2
3
4
5
spring:
cloud:
stream:
bindings: # 服务的整合处理
fanoutPro-out-0:
destination: fanout-ex
cloud:
stream:
rabbit:
bindings:
fanoutPro-out-0:
producer:
exchangeType: fanout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
消费者
@Bean
public Consumer<String> consumer3() {
return msg -> {
System.out.println("交换机3号: " + msg);
};
}
@Bean
public Consumer<String> consumer4() {
return msg -> {
System.out.println("交换机4号: " + msg);
};
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13