JAVA

발행자(Publisher) 구독자(Subscriber) 패턴

발행자(Publisher) 구독자(Subscriber) 패턴

pub-sub
실시간 데이터 처리 Flux & SSE콘텐츠의 실시간 배포시스템 구축을 위해 등장한 Flux는 클라이언트와 서버 사이의 연결을 지속적으로 유지하게 만들어주는 기술이다. 데이터를 한번에 보내는 것이 아닌 여러번에 나누어 보내는 것으로 연결선(Stream)을 유지할 수 있다....

Pub-Sub 패턴은 실시간 데이터 처리를 하기 위한 패턴이다. 라이브러리로 사용하면 간단하게 만들 수 있는 방법이 있지만 개념이해를 위해 직접 만들어 본다.

sub-pub

프로젝트 생성

1.’reacter-project’ 자바 프로젝트 생성
spring

java11부터 자바 리액터 라이브러리를 제공해준다.

https://projectreactor.io

2.main이 있는 클래스 ‘MyApp.java’를 생성

 

발행자(Publisher) 생성

3.발행자 클래스 ‘MyPublisher.java’를 생성
package ex01;

import java.util.Arrays;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

// 출판사 = 신문사
public class MyPublisher implements Publisher<Integer>{

  // 데이터가 순차적으로 저장되어있는 데이터 구조->순차적으로 데이터를 꺼낼 수 있다.(내부가 Queue)
  Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
  
  @Override
  public void subscribe(Subscriber<? super Integer> subscriber) {
    System.out.println("1.구독 요청");
  }

}

구독 정보를 발행해내는 클래스

  1. java11부터 제공하는 라이브러리 Publisher를 implement한다.
  2. 어떤 데이터를 제공할지는 미리 알 수 없기 때문에 제네릭으로 제공된다. 여기서는 Integer 타입으로 테스트한다.
  3. 강제성 함부를 구현한다.
  4. Iterable 타입에 데이터 리스트를 넣어준다. 데이터가 순차적으로 저장되어있는 데이터 구조->순차적으로 데이터를 꺼낼 수 있다.(내부가 Queue이다.)
Iterable 타입 사용 예제
	Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		System.out.println(its);
		System.out.println(its.iterator()); 
		
		Iterator<Integer> it = its.iterator();
		System.out.println(it.next());
		System.out.println(it.next());
		System.out.println(it.hasNext());
		System.out.println(it.next());
		while(it.hasNext()) {
			System.out.println(it.next());
		}

index로 찾을 수 없다. hashMap처럼 key 값으로 찾을 수 없다. 순차적으로 데이터를 꺼내기 위한 데이터 타입이다

 

구독정보(Subscription) 생성

4.구독정보 ‘MySubscription.java’를 생성
package ex01;

import java.util.Iterator;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독 정보를 가진 클래스
public class MySubscription implements Subscription {
  
  public MySubscription(Iterable<Integer> its, Subscriber subscriber) {
    System.out.println("구독 정보 만들어짐");
  }

  @Override
  public void request(long n) {

    System.out.println("3.신문 "+n+"개씩 받을께!");
    
  }

  @Override
  public void cancel() {
    System.out.println("구독 취소");
  }
      
}

구독 정보를 가진 클래스. MyPublisher.java에 구독 요청이 들어올 때마다 만들어진다.

  1. Subscription을 implement한다.
  2. 강제성 함부를 구현한다.

request 함수 : 구독요청 n=신문의 개수

cancel 함수 : 구독취소

원래는 topic을 넘겨줘야 해요.
5.발행자 클래스에서 구독정보를 new 한다.
package ex01;

import java.util.Arrays;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;

// 출판사 = 신문사
public class MyPublisher implements Publisher<Integer>{

  // 데이터가 순차적으로 저장되어있는 데이터 구조->순차적으로 데이터를 꺼낼 수 있다.(내부가 Q)
  Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
  
  @Override
  public void subscribe(Subscriber<? super Integer> subscriber) {
    System.out.println("1.구독 요청");
    subscriber.onSubscribe(new MySubscription(its,subscriber)); // 구독자에게 구독 정보를 넘겨준다.
  }

}

 

6.구독정보 클래스에서 발행자 클래스가 작동하면 데이터가 넘어갈 수 있게 한다.
package ex01;

import java.util.Iterator;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독 정보를 가진 클래스
public class MySubscription implements Subscription {
  
  private Iterator<Integer> it; // Iterator 타입으로 관리하기 쉽게 만든다.
  private Subscriber subscriber;
  
  public MySubscription(Iterable<Integer> its, Subscriber subscriber) {
    System.out.println("- 구독 정보 만들어짐");
    this.it = its.iterator();
    this.subscriber = subscriber;
  }

  @Override
  public void request(long n) {

    System.out.println("3.신문 "+n+"개씩 받을께!"); 
  }

  @Override
  public void cancel() {
    System.out.println("구독 취소");
  }
      
}

 

구독자(Subscriber) 생성

7.구독자 ‘MySubscriber.java’를 생성
package ex01;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독자 = 일반인 = 소비자 = 컨슈머 
public class MySubscriber implements Subscriber<Integer>{
  
  private Subscription subscription;

  @Override
  public void onSubscribe(Subscription subscription) {
    System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)");
    subscription.request(size); 
  }

  @Override
  public void onNext(Integer item) {

  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("신문 전달하다가 교통사고 났어");
  }

  @Override
  public void onComplete() {
    System.out.println("더 이상 줄 신문 없어");
  }

}

구독자 정보를 가진 클래스

  1. Subscriber를 implement한다.
  2. 강제성 함부를 구현한다.

onSubscribe 함수 :  구독 정보를 구독자에게 넘겨줄 때 실행되는 함수. 발행자 클래스에서 호출

onComplete 함수 : 응답할 정보 부재일 때 실행되는 함수

onError 함수 : 응답에 실패일 때 실행되는 함수

 

구독 요청 프로세스 실행

8.발행자와 구독자를 new 해서 구독자가 발행자를 구독할 수 있게 만든다.
package ex01;

import java.util.Arrays;
import java.util.Iterator;

public class MyApp {

  public static void main(String[] args) {
    
    MyPublisher pub = new MyPublisher();
    MySubscriber sub = new MySubscriber();
    
    pub.subscribe(sub); // 구독 시작
    
  }
  
}

 

구독 요청이 들어오면 MyApp.java 파일을 실행, subscribe 함수가 호출되면서 구독자 정보(sub)가 넘어간다.
pub.subscribe(sub);
subscribe 함수가 실행되면 구독자에게 구독 정보를 넘겨준다.
	@Override
	public void subscribe(Subscriber<? super Integer> subscriber) {
		System.out.println("1.구독 요청");
		subscriber.onSubscribe(new MySubscription(its,subscriber)); // 구독자에게 구독 정보를 넘겨준다.
	}
onSubscribe 함수로 구독 정보와 구독자 정보를 받아 구독자에게 보낸다.
@Override
	public void onSubscribe(Subscription subscription) {
		System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)");
	}
실행시 결과

1.구독 요청

– 구독정보 만들어짐

2.구독 응답 완료(구독정보 – 데이터, 구독자)

백프레셔

9.구독자에게 소비할 수 있는 구독 정보를 선택할 수 있게 동적으로 만든다.
package ex01;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독자 = 일반인 = 소비자 = 컨슈머 
public class MySubscriber implements Subscriber<Integer>{
  
  private Subscription subscription;
  
  @Override
  public void onSubscribe(Subscription subscription) {
    System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)");
    subscription.request(1); // 백플레셔로 소비할 수 있는만큼 받는다. -> 동적 
  }

  @Override
  public void onNext(Integer item) {

  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("신문 전달하다가 교통사고 났어");
  }

  @Override
  public void onComplete() {
    System.out.println("더 이상 줄 신문 없어");
  }

}
request 함수가 실행된다.
subscription.request(size);
오버라이드 : 부모의 request 요청 -> 자식 request 실행
@Override
	public void request(long n) {

		System.out.println("3.신문 "+n+"개씩 받을께!");
		
	}
실행시 결과

1.구독 요청

– 구독정보 만들어짐

2.구독 응답 완료(구독정보 – 데이터, 구독자)

3.신문 1개씩 받을께!

구독 요청 응답

10.구독자의 요청에 응답하는 onNext 함수를 호출하여 데이터를 1개씩 넘겨준다.
package ex01;

import java.util.Iterator;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독 정보를 가진 클래스
public class MySubscription implements Subscription {
  
  private Iterator<Integer> it; // 원래는 이거 동적으로 만들어야함 (topic)
  private Subscriber subscriber;
  
  public MySubscription(Iterable<Integer> its, Subscriber subscriber) {
    System.out.println(" - 구독 정보 만들어짐");
    this.it = its.iterator();
    this.subscriber = subscriber;
  }

  @Override
  public void request(long n) {

    System.out.println("3.신문 "+n+"개씩 받을께!");
    subscriber.onNext(it.next());
    
  }

  @Override
  public void cancel() {
    System.out.println("구독 취소");
  }
      
}
package ex01;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독자 = 일반인 = 소비자 = 컨슈머 
public class MySubscriber implements Subscriber<Integer>{
  
  private Subscription subscription;
  
  @Override
  public void onSubscribe(Subscription subscription) {
    System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)");
    subscription.request(1);
  }

  @Override
  public void onNext(Integer item) {
    System.out.println("4.신문 받음 : "+item);
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("신문 전달하다가 교통사고 났어");
  }

  @Override
  public void onComplete() {
    System.out.println("더 이상 줄 신문 없어");
  }

}
request 함수가 실행되면 onNext 함수가 호출되어 구독자에게 데이터를 1개 넘겨준다.
	@Override
	public void request(long n) {

		System.out.println("3.신문 "+n+"개씩 받을께!");
		subscriber.onNext(it.next());
		
	}
onNext 함수가 실행되면 구독자가 구독 정보를 받는다.
@Override
	public void onNext(Integer item) {
		System.out.println("4.신문 받음 : "+item);
	}
실행시 결과

1.구독 요청

– 구독정보 만들어짐

2.구독 응답 완료(구독정보 – 데이터, 구독자)

3.신문 1개씩 받을께!

4.신문 받음 : 1

동적 백프레셔

11.구독 정보 10개 데이터를 2개씩 모두 받을 수 있게 한다.
package ex01;

import java.util.Iterator;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독 정보를 가진 클래스
public class MySubscription implements Subscription {
  
  private Iterator<Integer> it; // 원래는 이거 동적으로 만들어야함 (topic)
  private Subscriber subscriber;
  
  public MySubscription(Iterable<Integer> its, Subscriber subscriber) {
    System.out.println(" - 구독 정보 만들어짐");
    this.it = its.iterator();
    this.subscriber = subscriber;
  }

  @Override
  public void request(long n) {

    System.out.println("3.신문 "+n+"개씩 받을께!"); 
    System.out.println("========================");
    while(n-->0) {
      if(it.hasNext()) {
        try {
          subscriber.onNext(it.next());
          Thread.sleep(500);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }else {
        subscriber.onComplete();
      }
    }
  }

  @Override
  public void cancel() {
    System.out.println("구독 취소");
  }
      
}
package ex01;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

// 구독자 = 일반인 = 소비자 = 컨슈머 
public class MySubscriber implements Subscriber<Integer>{
  
  private Subscription subscription;
  private Integer size = 2;
  private Integer maxSize = 2;
  
  @Override
  public void onSubscribe(Subscription subscription) {
    System.out.println("2.구독 응답 완료(구독정보 - 데이터, 구독자)");
    this.subscription = subscription; 
    subscription.request(size); 
  }

  @Override
  public void onNext(Integer item) {
    System.out.println("4.신문 받음 "+item);
    size--;
    if(size==0) {
      size =maxSize;
      subscription.request(size);
    }
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("신문 전달하다가 교통사고 났어");
  }

  @Override
  public void onComplete() {
    System.out.println("더 이상 줄 신문 없어");
  }

}
구독자 클래스에서 구독정보를 전역으로 받는다.

private Subscription subscription;

구독 데이터를 전역 변수로 빼준다.

private Integer size = 3;

private Integer maxSize = 3;

구독 정보를 구독자 클래스에서 전역으로 받을 수 있게 만든다.

this.subscription = subscription;

구독 정보를 전역으로 빼둔 값(2)만큼 받는다.
subscription.request(size);
size(2) 크기만큼 반복하고 데이터가 남아있으면(hasNext) onNext 함수가 다시 호출된다.
	@Override
	public void request(long n) {

		System.out.println("3.신문 "+n+"개씩 받을께!"); 
		System.out.println("========================");
		while(n-->0) {
			if(it.hasNext()) {
				try {
					subscriber.onNext(it.next());
					Thread.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}else {
				subscriber.onComplete();
			}
		}
	}
onNext 함수에서는 다시 request 함수가 호출된다.
	@Override
	public void onNext(Integer item) {
		System.out.println("4.신문 받음 "+item);
		size--;
		if(size==0) {
			size = maxSize;
			subscription.request(size);
		}
	}
request 함수와 onNext 함수가 서로 주고 받으며 데이터가 전부 출력될 때까지 실행된다.
실행시 결과

1.구독 요청

– 구독정보 만들어짐

2.구독 응답 완료(구독정보 – 데이터, 구독자)

3.신문 3 개씩 받을께!

4.신문 받음 : 1

4.신문 받음 : 2

4.신문 받음 : 3

3.신문 3개씩 받을께!

4.신문 받음 : 4

4.신문 받음 : 5

4.신문 받음 : 6

3.신문 3개씩 받을께!

4.신문 받음 : 7

4.신문 받음 : 8

4.신문 받음 : 9

3.신문 3개씩 받을께!

4.신문 받음 : 10

최신글