https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/datastream/
이번 페이지에서는 Client에서 JobManger에게 제출하는 Job 코드를 작성해보자.
의심스러운 신용카드 거래에 대한 알림을 제공하는 사기 탐지 시스템을 구축한다.
DOCS에는 MVN기준으로 되어있지만 Gradle이 더 익숙하기에 Gradle 및 Groovy 스타일을 이용해서 빌드.
java는 나와있는 것처럼 openJDK 11 을 새로 받아서 진행했다.
build.gradle은 다음과 같다.
plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
group = 'org.practice'
version = '1.0-SNAPSHOT'
java {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
repositories {
mavenCentral()
}
dependencies {
// Flink DataStream API
implementation 'org.apache.flink:flink-streaming-java:1.20.2'
// Flink 클러스터와 통신하기 위한 클라이언트 라이브러리
implementation 'org.apache.flink:flink-clients:1.20.2'
implementation 'org.apache.flink:flink-walkthrough-common:1.20.2'
// Flink 실행 시 로그를 출력하기 위한 로깅 라이브러리
runtimeOnly 'org.apache.logging.log4j:log4j-slf4j2-impl:2.23.1'
runtimeOnly 'org.apache.logging.log4j:log4j-api:2.23.1'
runtimeOnly 'org.apache.logging.log4j:log4j-core:2.23.1'
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
application {
mainClass = 'org.practice.DataStreamJob'
}
test {
useJUnitPlatform()
}
FraudDetect
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
// Flink 의 상태 저장소. 각 계정 별로 독립적인 상태를 가짐
// transient 키워드는 이 상태가 직렬화 대상이 아님을 명시
private transient ValueState<Boolean> flagState;
private transient ValueState<Long> timerState;
@Override
public void open(OpenContext openContext) {
// 작업이 시작될 때 Flink 컨텍스트에서 상태를 초기화
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timer-state", Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
// 현재 키(계정 ID)에 대한 상태를 가져옴
Boolean lastTransactionWasSmall = flagState.value();
// 이전에 소액 거래가 있었는지 확인
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
// 이전에 소액 거래가 있었고, 지금 고액 거래가 발생했다면 사기 경고를 출력
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// 패턴이 완료되었으므로 상태를 초기화
cleanUp(context);
}
if (transaction.getAmount() < SMALL_AMOUNT) {
// 현재 거래가 소액이라면, flag 상태를 true 설정
flagState.update(true);
// 1분 뒤에 onTimer 콜백 함수를 실행하도록 타이머를 설정
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
// 타이머 자체를 상태에 저장하여 나중에 취소할 수 있도록 함
timerState.update(timer);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
// 타이머가 실행되면(1분이 지나면), 상태를 초기화
// 이는 소액 거래 후 1분 내에 고액 거래가 없었음을 의미
timerState.clear();
flagState.clear();
}
private void cleanUp(Context ctx) throws Exception {
// 등록된 타이머를 삭제
Long timer = timerState.value();
if (timer != null) {
ctx.timerService().deleteProcessingTimeTimer(timer);
}
// 모든 상태를 깨끗하게 지움
timerState.clear();
flagState.clear();
}
}
FraudDetectionJob
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// streaming 실행 환경 설정
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Data Source 추가
// TransactionSource 는 무한한 신용카드 거래 데이터 스트림을 생성합니다.
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
// 데이터를 처리
// keyBy를 사용해 동일한 계정의 거래는 항상 동일한 태스크에서 처리되도록 합니다.
// process 함수에 실제 사기 탐지 로직을 적용합니다.
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
// 결과를 출력(Sink)
// AlertSink 는 감지된 사기 거래(Alert)를 INFO 레벨로 로깅합니다.
alerts
.addSink(new AlertSink())
.name("send-alerts");
//"Fraud Detection" 이라는 이름으로 작업을 실행합니다.
env.execute("Fraud Detection");
}
}
build를 통해 해당 프로젝트의 jar파일을 생성하고, 이전 first step 에서 만들었던 docker-compose.yml 파일이 있는 디렉토리에서 docker compose up -d 를 통해 flink를 실행시킨다.