https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/datastream/

이번 페이지에서는 Client에서 JobManger에게 제출하는 Job 코드를 작성해보자.

의심스러운 신용카드 거래에 대한 알림을 제공하는 사기 탐지 시스템을 구축한다.

How to Follow Along

DOCS에는 MVN기준으로 되어있지만 Gradle이 더 익숙하기에 Gradle 및 Groovy 스타일을 이용해서 빌드.

java는 나와있는 것처럼 openJDK 11 을 새로 받아서 진행했다.

build.gradle은 다음과 같다.

plugins {
    id 'java'
    id 'application'
    id 'com.github.johnrengelman.shadow' version '7.1.2'
}

group = 'org.practice'
version = '1.0-SNAPSHOT'

java {
    sourceCompatibility = JavaVersion.VERSION_11
    targetCompatibility = JavaVersion.VERSION_11
}

repositories {
    mavenCentral()
}

dependencies {
    // Flink DataStream API
    implementation 'org.apache.flink:flink-streaming-java:1.20.2'
    // Flink 클러스터와 통신하기 위한 클라이언트 라이브러리
    implementation 'org.apache.flink:flink-clients:1.20.2'
    implementation 'org.apache.flink:flink-walkthrough-common:1.20.2'

    // Flink 실행 시 로그를 출력하기 위한 로깅 라이브러리
    runtimeOnly 'org.apache.logging.log4j:log4j-slf4j2-impl:2.23.1'
    runtimeOnly 'org.apache.logging.log4j:log4j-api:2.23.1'
    runtimeOnly 'org.apache.logging.log4j:log4j-core:2.23.1'

    testImplementation platform('org.junit:junit-bom:5.9.1')
    testImplementation 'org.junit.jupiter:junit-jupiter'
}

application {
    mainClass = 'org.practice.DataStreamJob'
}

test {
    useJUnitPlatform()
}

FraudDetect

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;
    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    // Flink 의 상태 저장소. 각 계정 별로 독립적인 상태를 가짐
    // transient 키워드는 이 상태가 직렬화 대상이 아님을 명시
    private transient ValueState<Boolean> flagState;
    private transient ValueState<Long> timerState;

    @Override
    public void open(OpenContext openContext) {
        // 작업이 시작될 때 Flink 컨텍스트에서 상태를 초기화
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>("flag", Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timer-state", Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
        // 현재 키(계정 ID)에 대한 상태를 가져옴
        Boolean lastTransactionWasSmall = flagState.value();

        // 이전에 소액 거래가 있었는지 확인
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                // 이전에 소액 거래가 있었고, 지금 고액 거래가 발생했다면 사기 경고를 출력
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());
                collector.collect(alert);
            }
            // 패턴이 완료되었으므로 상태를 초기화
            cleanUp(context);
        }

        if (transaction.getAmount() < SMALL_AMOUNT) {
            // 현재 거래가 소액이라면, flag 상태를 true 설정
            flagState.update(true);

            // 1분 뒤에 onTimer 콜백 함수를 실행하도록 타이머를 설정
            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            // 타이머 자체를 상태에 저장하여 나중에 취소할 수 있도록 함
            timerState.update(timer);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
        // 타이머가 실행되면(1분이 지나면), 상태를 초기화
        // 이는 소액 거래 후 1분 내에 고액 거래가 없었음을 의미
        timerState.clear();
        flagState.clear();
    }

    private void cleanUp(Context ctx) throws Exception {
        // 등록된 타이머를 삭제
        Long timer = timerState.value();
        if (timer != null) {
            ctx.timerService().deleteProcessingTimeTimer(timer);
        }
        // 모든 상태를 깨끗하게 지움
        timerState.clear();
        flagState.clear();
    }
}

FraudDetectionJob

public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        // streaming 실행 환경 설정
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Data Source 추가
        // TransactionSource 는 무한한 신용카드 거래 데이터 스트림을 생성합니다.
        DataStream<Transaction> transactions = env
                .addSource(new TransactionSource())

                .name("transactions");

        // 데이터를 처리
        // keyBy를 사용해 동일한 계정의 거래는 항상 동일한 태스크에서 처리되도록 합니다.
        // process 함수에 실제 사기 탐지 로직을 적용합니다.
        DataStream<Alert> alerts = transactions
                .keyBy(Transaction::getAccountId)
                .process(new FraudDetector())
                .name("fraud-detector");

        // 결과를 출력(Sink)
        // AlertSink 는 감지된 사기 거래(Alert)를 INFO 레벨로 로깅합니다.
        alerts
                .addSink(new AlertSink())
                .name("send-alerts");

        //"Fraud Detection" 이라는 이름으로 작업을 실행합니다.
        env.execute("Fraud Detection");
    }
}

build를 통해 해당 프로젝트의 jar파일을 생성하고, 이전 first step 에서 만들었던 docker-compose.yml 파일이 있는 디렉토리에서 docker compose up -d 를 통해 flink를 실행시킨다.