Real-Time GraphQL with Spring - Subscriptions and WebSockets

Push live updates to clients with GraphQL subscriptions. Build a real-time notification system using Spring GraphQL and WebSocket.
What Are GraphQL Subscriptions?
While queries fetch data once and mutations change data, subscriptions maintain a persistent connection for real-time updates:
subscription {
bookAdded {
id
title
author { name }
}
}
When a new book is added, all subscribed clients receive an update automatically. No polling required.
Use Cases
- Live notifications
- Chat applications
- Real-time dashboards
- Collaborative editing
- Live sports scores
- Stock tickers
Setting Up WebSocket Support
Dependencies
Add WebSocket support to your project:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-graphql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
Configuration
Enable WebSocket endpoint in application.yml:
spring:
graphql:
websocket:
path: /graphql
connection-init-timeout: 60s
graphiql:
enabled: true
WebSocket Configuration Class
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
}
Defining Subscriptions in Schema
type Query {
books: [Book!]!
}
type Mutation {
createBook(input: CreateBookInput!): Book!
}
type Subscription {
bookAdded: Book!
bookUpdated(id: ID): Book!
notifications(userId: ID!): Notification!
}
type Book {
id: ID!
title: String!
author: Author!
}
type Notification {
id: ID!
type: NotificationType!
message: String!
createdAt: String!
}
enum NotificationType {
BOOK_ADDED
BOOK_UPDATED
COMMENT_ADDED
MENTION
}
input CreateBookInput {
title: String!
authorId: ID!
}
Implementing Subscriptions
Using Reactor's Flux
Spring GraphQL uses Project Reactor. Subscriptions return Flux<T>:
@Controller
public class BookSubscriptionController {
private final Sinks.Many<Book> bookSink;
public BookSubscriptionController() {
// Create a sink that replays the last event to new subscribers
this.bookSink = Sinks.many().multicast().onBackpressureBuffer();
}
@SubscriptionMapping
public Flux<Book> bookAdded() {
return bookSink.asFlux();
}
// Called by other parts of the app to publish events
public void publishBookAdded(Book book) {
bookSink.tryEmitNext(book);
}
}
Publishing Events from Mutations
@Controller
public class BookMutationController {
private final BookService bookService;
private final BookSubscriptionController subscriptionController;
public BookMutationController(BookService bookService,
BookSubscriptionController subscriptionController) {
this.bookService = bookService;
this.subscriptionController = subscriptionController;
}
@MutationMapping
public Book createBook(@Argument CreateBookInput input) {
Book book = bookService.createBook(input);
// Publish to subscribers
subscriptionController.publishBookAdded(book);
return book;
}
}
Filtered Subscriptions
Allow clients to subscribe to specific events:
@Controller
public class NotificationController {
private final Sinks.Many<Notification> notificationSink;
public NotificationController() {
this.notificationSink = Sinks.many().multicast().onBackpressureBuffer();
}
@SubscriptionMapping
public Flux<Notification> notifications(@Argument String userId) {
return notificationSink.asFlux()
.filter(notification -> notification.targetUserId().equals(userId));
}
@SubscriptionMapping
public Flux<Book> bookUpdated(@Argument String id) {
return bookUpdateSink.asFlux()
.filter(book -> id == null || book.id().equals(id));
}
public void publishNotification(Notification notification) {
notificationSink.tryEmitNext(notification);
}
}
Event-Driven Architecture
For production systems, use Spring's event system or a message broker:
Using Spring Events
// Event class
public record BookCreatedEvent(Book book, Instant timestamp) {}
// Publisher
@Service
public class BookService {
private final ApplicationEventPublisher eventPublisher;
public Book createBook(CreateBookInput input) {
Book book = // ... create book
eventPublisher.publishEvent(new BookCreatedEvent(book, Instant.now()));
return book;
}
}
// Subscription controller
@Controller
public class BookSubscriptionController {
private final Sinks.Many<Book> bookSink = Sinks.many().multicast().onBackpressureBuffer();
@EventListener
public void handleBookCreated(BookCreatedEvent event) {
bookSink.tryEmitNext(event.book());
}
@SubscriptionMapping
public Flux<Book> bookAdded() {
return bookSink.asFlux();
}
}
Using Redis Pub/Sub (Scalable)
For multiple server instances:
@Configuration
public class RedisConfig {
@Bean
public ReactiveRedisTemplate<String, Book> reactiveRedisTemplate(
ReactiveRedisConnectionFactory factory) {
RedisSerializationContext<String, Book> context = RedisSerializationContext
.<String, Book>newSerializationContext(new StringRedisSerializer())
.value(new Jackson2JsonRedisSerializer<>(Book.class))
.build();
return new ReactiveRedisTemplate<>(factory, context);
}
}
@Controller
public class BookSubscriptionController {
private final ReactiveRedisTemplate<String, Book> redisTemplate;
@SubscriptionMapping
public Flux<Book> bookAdded() {
return redisTemplate.listenToChannel("books:created")
.map(message -> message.getMessage());
}
}
@Service
public class BookService {
private final ReactiveRedisTemplate<String, Book> redisTemplate;
public Book createBook(CreateBookInput input) {
Book book = // ... create book
redisTemplate.convertAndSend("books:created", book).subscribe();
return book;
}
}
Client Implementation
JavaScript with graphql-ws
import { createClient } from 'graphql-ws';
const client = createClient({
url: 'ws://localhost:8080/graphql',
});
// Subscribe to new books
const unsubscribe = client.subscribe(
{
query: `subscription {
bookAdded {
id
title
author { name }
}
}`,
},
{
next: (data) => {
console.log('New book:', data.data.bookAdded);
// Update UI
addBookToList(data.data.bookAdded);
},
error: (err) => {
console.error('Subscription error:', err);
},
complete: () => {
console.log('Subscription completed');
},
}
);
// Later: unsubscribe
unsubscribe();
React with Apollo Client
import { useSubscription, gql } from '@apollo/client';
const BOOK_ADDED = gql`
subscription OnBookAdded {
bookAdded {
id
title
author { name }
}
}
`;
function BookList() {
const [books, setBooks] = useState([]);
const { data, loading, error } = useSubscription(BOOK_ADDED, {
onData: ({ data }) => {
setBooks(prev => [...prev, data.data.bookAdded]);
}
});
if (loading) return <p>Connecting...</p>;
if (error) return <p>Error: {error.message}</p>;
return (
<ul>
{books.map(book => (
<li key={book.id}>{book.title} by {book.author.name}</li>
))}
</ul>
);
}
Connection Lifecycle
Understanding the WebSocket connection lifecycle:
┌─────────────────────────────────────────────────────────────────┐
│ WebSocket Connection │
└─────────────────────────────────────────────────────────────────┘
Client Server
│ │
│──────────── WebSocket Handshake ──────────────────▶│
│◀─────────── Connection Accepted ──────────────────│
│ │
│──────────── connection_init ──────────────────────▶│
│◀─────────── connection_ack ───────────────────────│
│ │
│──────────── subscribe (id: "1") ──────────────────▶│
│ │
│◀─────────── next (id: "1", data) ─────────────────│
│◀─────────── next (id: "1", data) ─────────────────│
│◀─────────── next (id: "1", data) ─────────────────│
│ │
│──────────── complete (id: "1") ───────────────────▶│
│ │
│──────────── connection_terminate ─────────────────▶│
│ │
Handling Connection Authentication
Authenticate WebSocket connections:
@Component
public class SubscriptionInterceptor implements WebSocketInterceptor {
private final AuthService authService;
@Override
public Mono<Object> handleConnectionInitialization(
WebSocketSessionInfo info, Map<String, Object> payload) {
String token = (String) payload.get("authToken");
if (token == null) {
return Mono.error(new UnauthorizedException("Auth token required"));
}
return authService.validateToken(token)
.map(user -> Map.of("user", user)) // Store in session
.switchIfEmpty(Mono.error(new UnauthorizedException("Invalid token")));
}
}
Client sends token during connection:
const client = createClient({
url: 'ws://localhost:8080/graphql',
connectionParams: {
authToken: 'your-jwt-token'
},
});
Testing Subscriptions
@SpringBootTest
@AutoConfigureWebTestClient
class BookSubscriptionTest {
@Autowired
private WebTestClient webTestClient;
@Autowired
private BookSubscriptionController subscriptionController;
@Test
void shouldReceiveBookAddedEvents() {
// Create a test client for WebSocket
Flux<Book> subscription = subscriptionController.bookAdded();
StepVerifier.create(subscription.take(2))
.then(() -> {
// Simulate adding books
subscriptionController.publishBookAdded(
new Book("1", "Book 1", "author-1", null, null));
subscriptionController.publishBookAdded(
new Book("2", "Book 2", "author-1", null, null));
})
.expectNextMatches(book -> book.title().equals("Book 1"))
.expectNextMatches(book -> book.title().equals("Book 2"))
.verifyComplete();
}
}
Production Considerations
1. Connection Limits
Limit connections per user:
@Component
public class ConnectionLimitInterceptor implements WebSocketInterceptor {
private final Map<String, AtomicInteger> connectionCounts = new ConcurrentHashMap<>();
private static final int MAX_CONNECTIONS = 5;
@Override
public Mono<Object> handleConnectionInitialization(
WebSocketSessionInfo info, Map<String, Object> payload) {
String userId = extractUserId(payload);
AtomicInteger count = connectionCounts.computeIfAbsent(
userId, k -> new AtomicInteger(0));
if (count.incrementAndGet() > MAX_CONNECTIONS) {
count.decrementAndGet();
return Mono.error(new TooManyConnectionsException());
}
return Mono.just(payload);
}
}
2. Heartbeat/Keep-Alive
Configure ping/pong intervals:
spring:
graphql:
websocket:
connection-init-timeout: 60s
# Keep-alive ping every 30 seconds
server:
servlet:
session:
timeout: 30m
3. Graceful Shutdown
Handle server restarts:
@PreDestroy
public void shutdown() {
bookSink.tryEmitComplete();
}
Complete Example: Live Notifications
Here's a complete notification system:
type Subscription {
notifications(userId: ID!): Notification!
}
type Notification {
id: ID!
type: NotificationType!
title: String!
message: String!
link: String
read: Boolean!
createdAt: String!
}
enum NotificationType {
NEW_BOOK
NEW_COMMENT
MENTION
SYSTEM
}
@Controller
public class NotificationSubscriptionController {
private final Sinks.Many<Notification> sink =
Sinks.many().multicast().onBackpressureBuffer();
@SubscriptionMapping
public Flux<Notification> notifications(@Argument String userId) {
return sink.asFlux()
.filter(n -> n.targetUserId().equals(userId))
.doOnSubscribe(s -> log.info("User {} subscribed to notifications", userId))
.doOnCancel(() -> log.info("User {} unsubscribed", userId));
}
@EventListener
public void handleNewBook(BookCreatedEvent event) {
// Notify followers of the author
event.author().followers().forEach(followerId -> {
Notification notification = new Notification(
UUID.randomUUID().toString(),
NotificationType.NEW_BOOK,
followerId,
"New Book Available",
event.author().name() + " published: " + event.book().title(),
"/books/" + event.book().id(),
false,
Instant.now()
);
sink.tryEmitNext(notification);
});
}
}
Summary
| Concept | Implementation |
|---|---|
| Define subscription | type Subscription { ... } in schema |
| Return type | Flux<T> from controller method |
| Annotation | @SubscriptionMapping |
| Event publishing | Sinks.Many<T> or Spring Events |
| Scaling | Redis Pub/Sub or Kafka |
| Authentication | WebSocketInterceptor |
Subscriptions bring your GraphQL API to life with real-time capabilities. Combined with Spring's reactive support, you can build scalable, event-driven systems.
Next: DataLoader and Batch Loading - solving the N+1 problem in Spring GraphQL.