소개
대용량 Json 처리시 유용하다
기본적으로 RequestBody 에 담겨있는 데이터는 서버 메모리로 담는 방식이기 때문에
대용량이 들어올 경우, OOM에러가 나올 수 있다.
이를 방지하기 위해 JSON Stream 방식을 이용하면 된다.
Controller
- HttpServletRequest 필수, request의 InputStream을 사용한다.
- RequestBody 어노테이션은 사용하지 않는다.
java@PostMapping("/stream") public ResponseEntity<?> stream(HttpServletRequest request) { streamService.stream(request.getInputStream()); return ResponseEntity.ok(); }
Service
- JsonParser를 이용하여, InputStream의 정보를 가져온다.
- Stream으로 실시간으로 읽어오면서 처리하기 때문에, Transactional을 이용한 데이터 저장은 비추천한다.
- 가져온 데이터는 Redis를 이용하여 바로 저장한 뒤, Redis의 데이터를 따로 저장하는 방식으로 진행한다.
java@Service @RequiredArgsConstructor public class JsonStreamingService { private final RedisTemplate<String, Object> redisTemplate; private final ObjectMapper objectMapper; private final int BUFFER_SIZE = 1000; // stream 진행 public void stream(InputStream is) { String tempToken = UUID.randomUUID().toString(); JsonFactory jsonFactory = new JsonFactory(); try (JsonParser parser = jsonFactory.createParser(is)) { // parser로 진행되는 경우, Object, Array, FieldName, Value(String, Number, Boolean)을 가져올 수 있다. // 해당 예제는 JSON Object 형태로 들어온 경우로 진행한다. Map<String, Object> map = streamObject(parser, tempToken); map.put("tmpToken", tempToken); // 객체는 바로 DB 저장하는 로직을 넣을 수 있음 } catch (IOException e) { throw new RuntimeException(e); } } // 객체 저장 private Map<String, Object> streamObject(JsonParser parser, String tempToken) throws IOException { Map<String, Object> map = new HashMap<>(); if (parser.nextToken() != JsonToken.START_OBJECT) throw new IOException("Expected Object"); // END_OBJECT가 나올때까지 반복 while (parser.nextToken() != JsonToken.END_OBJECT) { String fieldName = parser.currentName(); JsonToken valueToken = parser.nextToken(); if (valueToken == JsonToken.START_OBJECT) { map.put(fieldName, streamObject(parser, tempToken)); } else if (valueToken == JsonToken.START_ARRAY) { streamArray(parser, tempToken); } else { Object value = objectMapper.readValue(parser, Object.class); map.put(fieldName, value); } } return map; } // 배열 저장 private void streamArray(JsonParser parser, String tempToken) throws IOException { String queueKey = "task:bulk:items"; List<String> list = new ArrayList<>(); if (parser.nextToken() != JsonToken.START_ARRAY) throw new IOException("Expected Array"); // END ARRAY가 나올때까지 반복 // 배열 길이가 길수록 메모리 사용량이 늘어나므로 버퍼사이즈에 맞게 잘라가면서 Redis에 저장 while (parser.nextToken() != JsonToken.END_ARRAY) { Map<String, Object> item = streamObject(parser, tempToken); item.put("queueKey", queueKey); list.add(objectMapper.writeValueAsString(item)); } flushToRedis(queueKey, list); } // 배열 Redis 저장 private void flushToRedis(String key, List<String> buffers) { redisTemplate.executePipelined((RedisCallback<Object>) connection -> { // 1. Key 직렬화 (한 번만 수행) byte[] rawKey = redisTemplate.getStringSerializer().serialize(key); if (null == rawKey) return null; // 2. DataList의 각 항목을 byte[]로 변환하여 리스트 준비 // 최신 버전의 lPush는 byte[][] (가변 인자)를 받을 수 있어 더욱 효율적입니다. byte[][] rawValues = buffers.stream() .map(buffer -> redisTemplate.getStringSerializer().serialize(buffer)) .toArray(byte[][]::new); // 3. 최신 방식의 lPush 호출 (가변 인자 지원) if (rawValues.length > 0) { connection.listCommands().lPush(rawKey, rawValues); } return null; }); } }