해당 노션에서는 다음 Github의 Rides and Fares Job 에 대해 다룬다.
https://github.com/pjw81226/flink-training/tree/release-1.17
Fork 후에 jangwoo 브랜치를 만든다. 이후의 업데이트, 코드의 변경사항은 해당 브랜치에 기록한다.
목표는 다음과 같다.
택시 탑승(TaxiRide)과 택시 요금(TaxiFare)을 하나의 RideAndFare로 결합하자.
정보 1: 123번 탑승이 시작됨 (TaxiRide)
정보 2: 123번 탑승의 요금은 15,000원 (TaxiFare)
최종 데이터: 123번 탑승은 ... (탑승 상세 정보) ... 이고, 요금은 15,000원 (RideAndFare)
execute()

env.addSource(rideSource): 실행 환경에 택시 탑승 데이터 소스를 추가하여 데이터 스트림을 생성
.filter(ride -> ride.isStart): 스트림에서 탑승시작(isStart가 true)을 알리는 이벤트만 남기고 나머지는 제외.
.keyBy(ride -> ride.rideId): 스트림을 rideId 기준으로 나눈다. 동일한 rideId를 가진 이벤트(탑승, 요금 정보)는 항상 같은 Flink 태스크에서 처리되도록 보장한다.
파이프라인

rides.connect(fares) : rides, fares 두 스트림을 연결한다. → 두 스트림의 이벤트를 함께 처리할 수 있는 ConnectedStreams 객체가 생성된다.
.flatMap(new EnrichmentFunction()) : 연결된 스트림에 사용자 커스텀 함수인 EnrichmentFunction() 을 적용한다. (후술할 예정)
.uid("enrichment") : 이 operator에 고유 ID를 부여한다. 작업 재시작 시 저장된 체크포인트를 올바른 연산자에 다시 매핑할 수 있게 해준다.
.name("enrichment"): Flink 웹 UI에 표시될 연산자의 이름을 지정한다.
.addSink(sink): 처리된 RideAndFare 객체를 출력할 싱크를 연결한다.
return env.execute("Join Rides with Fares") : 주어진 이름으로 플링크를 실행한다.
main

데이터 스트림 TaxiRideGenerator, TaxiFareGenerator와 결과를 콘솔에 출력하는 PrintSinkFunction 을 생성자에 집어넣고 RidesAndFareSolution 클래스를 job이라는 이름으로 생성한다.

Flink의 체크포인팅 설정을 코드에서 직접 하는 부분
state.backend, : 상태를 파일 시스템에 저장하도록 설정
state.checkpoints.dir: 체크포인트 데이터가 저장될 디렉터리를 지정
execution.checkpointing.interval: 10초마다 체크포인트를 생성
externalized-checkpoint-retention: 작업이 취소되어도 체크포인트를 삭제하지 않고 보존

체크포인팅 구성을 포함, Flink 실행 환경을 생성, 작업 실행한다.
EnrichmentFunction

RichCoFlatMapFunction : 두 개의 연결된 스트림에 적용할 수 있는 함수의 기본 클래스

ValueState<TaxiRide> rideState;: 상태 변수. ValueState는 단 하나의 값을 저장하는 상태 타입이다. 이 변수는 TaxiFare보다 TaxiRide 이벤트가 먼저 도착했을 경우, 해당 TaxiRide를 저장하는 데 사용됩니다.
private ValueState<TaxiFare> fareState;: TaxiFare 이벤트가 먼저 도착했을 경우, 그것을 저장하기 위한 또 다른 상태 변수
스트림이 rideId로 keyBy 되었기 때문에, Flink는 각각의 고유한 rideId마다 별도의 rideState와 fareState를 관리한다.
open

open: 이 함수가 처음 초기화될 때 한 번만 호출되는 메서드getRuntimeContext().getState(...): Flink가 관리하는 state에 접근하기 위한 핸들을 얻는 메소드new ValueStateDescriptor<>(...): 상태를 설명하는 객체. Flink가 내부적으로 상태를 관리할 수 있도록 고유한 이름(saved ride)과 저장할 데이터의 타입(TaxiRide.class)을 지정한다.flatmap1

flatMap1: 이 메서드는 첫 번째 스트림(TaxiRide 스트림)에서 이벤트가 도착할 때마다 호출된다.TaxiFare fare = fareState.value(): fareState를 확인해 짝이 되는 TaxiFare가 이미 도착해서 저장되어 있는지 확인한다.TaxiFare가 있다면 저장되어있는 TaxiFare State를 삭제하고 새로도착한 TaxiRide와 기존의 TaxiFare를 합쳐 새로운 RideAndFare 객체를 만든다. 해당 객체를 출력스트림으로 내보낸다.fareState에 저장된 요금 정보가 없다면, 지금 도착한 ride 정보를 rideState에 저장하고 나중에 요금 정보가 도착하기를 기다린다.flatmap2
