
카프카, 클라이언트 흐름 정리

Pinpoint 트랜잭션 2개로 체크 되는 이슈
- 조회 요청 시, Pinpoint 트랜잭션에는 1개가 기록되어야 하는 데, 두 2개가 기록 됨.
- callstack 확인 결과, getConnection()의 executeQuery() 한 개 그리고 조회하는 executeQuery()한 개가 발생.
- 여기서 getConnection()과 이 부분에서 왜 쿼리가 발생하는지 분석.
- Springboot에서 2.0 이후로 HIKARICP를 기본적으로 사용 HIKARICP에서 getConection을 만들 때 처음에 Connect Test Query를 전달 밑의 프로퍼티를 추가를 해주면 아래 사진 처럼 SELECT 1이라고 test query를 커스텀 할 수 있다.
spring:
datasource:
hikari:
connection-test-query: SELECT 1

HikariCP 뜯어보기 2편
카프카 웹소켓 연결
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println(message.getPayload());
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "eventTimeId" + message.getPayload());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("reservationEventTimeId" + message.getPayload()));
while (true) {
ConsumerRecords<String, String> record = consumer.poll(Duration.ofMillis(100));
record.forEach(r -> {
System.out.println("Received message from " + r.topic() + " : " + r.value() + " :" + r.offset());
// 메시지 처리 로직을 여기에 추가
/* 1. 같은 아이디 일 경우에만 메시지를 전달 2. 남은 좌석을 보여주고 CLIENTS.remove(session.getId());로 연결을 끊음 */
String id = session.getId(); //메시지를 보낸 아이디
CLIENTS.entrySet().forEach(arg -> {
if (!arg.getKey().equals(id)) { //같은 아이디가 아니면 메시지를 전달합니다.
try {
arg.getValue()
.sendMessage(new TextMessage(
"Received message from " + r.topic() + ": " + r.value() + " :" + r.offset()));
} catch (IOException e) {
e.printStackTrace();
}
}
});
});
}
}