Inserting Data Into Elasticsearch With The Bulk API And Java

1. Introduction

In this blog, we will show you how to use the Bulk API to insert data into Elasticsearch. The Bulk API makes it possible to perform many index/update operations in a single API call. This can greatly reduce the overhead of indexing data, especially for batch operations.

2. What is the bulk API?

The Bulk API is a powerful tool for working with large amounts of data in Elasticsearch. It allows you to perform bulk operations, such as indexing, updating, and deleting data, in a single request. This can be incredibly helpful when you need to make a bunch of changes to your data all at once.

The bulk API is designed to be efficient and easy to use. It supports parallelism, so you can make multiple requests at the same time to speed up your process.

We have sent a request to insert two documents into an employee index using the bulk API, as shown below.

  • The first line tells what operation needs to be performed.
  • The second line contains the request body based on the operation that we are performing. As we are inserting a document into elastic, we provide JSON documents.
Elastic Bulk API insert bulk data into elastic using java

3. How to use the Bulk API for data insertion (with Java code snippets)

The Elasticsearch bulk API allows users to index, update, and delete multiple documents in a single request. To do this, data must be formatted in a specific way — known as the bulk API format — and sent to the Elasticsearch cluster. This format is written as a sequence of multi-line documents, each ending with a newline character and preceded by a meta-information line.

All operations take the name of the index on which the operation is performed. It's required if <indexname> path parameter is not given in the request.

Bulk Create Operation

Index the specified document if it doesn't exist. Take the document to index on the next line.

  • Document ID. If ID is not specified in the request, a random ID is generated automatically. 

CreateOperation.Builder < Object > createBuilder = new CreateOperation.Builder < > ();

if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
	  createBuilder.id(operation.getId());
}
	
createBuilder.document(operation.getPayload()).index(operation.getIndex());
bulkOperation = bulkOperationBuilder.create(createBuilder.build()).build();

Bulk Delete Operation

Remove the document from the index by the Document ID.

  • Document ID is a mandatory parameter. 

DeleteOperation.Builder deleteBuilder = new DeleteOperation.Builder();

if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
	deleteBuilder.index(operation.getIndex()).id(operation.getId());
	bulkOperation = bulkOperationBuilder.delete(deleteBuilder.build()).build();
}

Bulk Index Operation

Indexes the specified document. If the document exists, replace the document and increment the version. This operation requires document content. 

  • Document ID. If ID is not specified in the request, a random ID is generated automatically. 

IndexOperation.Builder<Object> indexBuilder = new IndexOperation.Builder<>();

if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
	indexBuilder.id(operation.getId());
}
	
indexBuilder.document(operation.getPayload()).index(operation.getIndex());
bulkOperation = bulkOperationBuilder.index(indexBuilder.build()).build();

Bulk Update Operation

Update the document. If the document exists, it is updated partially. If both document content is the same, then the update operation is not performed.

  • Document ID is a mandatory parameter. 

UpdateOperation.Builder<Object, Object> updatebuilder = new UpdateOperation.Builder<>();

if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
		updatebuilder.action(
			new UpdateAction.Builder<>().doc(operation.getPayload()).docAsUpsert(true).build())
			.index(operation.getIndex()).id(operation.getId());	
	bulkOperation = bulkOperationBuilder.update(updatebuilder.build()).build();
}	

Model Class

Create a POJO class to store the request details which is a list of objects (operation). OperationType is an enum.


@Data
@NoArgsConstructor
public class BulkRequest {

  private List <Operation> operations;

  @Data
  @NoArgsConstructor
  public static class Operation {
    private OperationType type;
    private String index;
    private String id;
    private Object payload;

  }
}

Controller Class

Spring's controller class handles incoming HTTP requests, and here we have created an endpoint (/bulk) to handle bulk requests.

The ElasticsearchClient takes a BulkRequest and performs its operations on an elastic cluster. BulkRequest takes a list of bulk operations.

We use the Builder pattern to create bulk operations based on the OperationType.


@RestController
public class BulkController {

  @Autowired
  ElasticsearchClient client;

  @GetMapping(path = "bulk", produces = "application/json")

  public String bulkOperation(@RequestBody(required = true) BulkRequest bulkRequest)
  throws ElasticsearchException, IOException {

    co.elastic.clients.elasticsearch.core.BulkRequest.Builder bulkRequestBuilder = new co.elastic.clients.elasticsearch.core.BulkRequest.Builder();

    List < BulkOperation > bulkOperations = new ArrayList < > ();

    if (!CollectionUtils.isEmpty(bulkRequest.getOperations())) {

      for (Operation operation: bulkRequest.getOperations()) {

        BulkOperation.Builder bulkOperationBuilder = new BulkOperation.Builder();
        BulkOperation bulkOperation = null;

        switch (operation.getType()) {
        case CREATE: {

          CreateOperation.Builder < Object > createBuilder = new CreateOperation.Builder < > ();

          if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
            createBuilder.id(operation.getId());
          }

          createBuilder.document(operation.getPayload()).index(operation.getIndex());

          bulkOperation = bulkOperationBuilder.create(createBuilder.build()).build();
          break;
        }

        case DELETE: {
          DeleteOperation.Builder deleteBuilder = new DeleteOperation.Builder();

          if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
            deleteBuilder.index(operation.getIndex()).id(operation.getId());
            bulkOperation = bulkOperationBuilder.delete(deleteBuilder.build()).build();
          }

          // delete operation requires a document Id
          break;
        }

        case INDEX: {
          IndexOperation.Builder < Object > indexBuilder = new IndexOperation.Builder < > ();

          if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {
            indexBuilder.id(operation.getId());
          }

          indexBuilder.document(operation.getPayload()).index(operation.getIndex());
          bulkOperation = bulkOperationBuilder.index(indexBuilder.build()).build();
          break;
        }

        case UPDATE: {
          UpdateOperation.Builder < Object, Object > updatebuilder = new UpdateOperation.Builder < > ();

          if (Objects.nonNull(operation.getId()) && !operation.getId().isEmpty()) {

            updatebuilder.action(
                new UpdateAction.Builder < > ().doc(operation.getPayload()).docAsUpsert(true).build())
              .index(operation.getIndex()).id(operation.getId());

            bulkOperation = bulkOperationBuilder.update(updatebuilder.build()).build();

          }

          break;
        }

        default:
          throw new IllegalArgumentException("Unexpected value: " + operation.getType());
        }

        bulkOperations.add(bulkOperation);

      }

      co.elastic.clients.elasticsearch.core.BulkRequest request = bulkRequestBuilder.operations(bulkOperations)
        .build();

      System.out.println(getResponse(request));
      return getResponse(client.bulk(request));

    } else {
      // cannot perform bulk as no operation is present in request body
      return null;
    }

  }

  private String getResponse(JsonpSerializable response) throws IOException {
    StringWriter writer = new StringWriter();
    JsonGenerator generator = new JacksonJsonpGenerator(new JsonFactory().createGenerator(writer));
    response.serialize(generator, new JacksonJsonpMapper());
    generator.flush();
    return writer.toString();
  }
}

Looping through the operation given in the request body, we use the Switch statement to create the corresponding bulk operation based on the OperationType. 

Start our Spring Boot application and send a request through Postman. This allows us to create a generic bulk operation that can handle multiple types of requests. Additionally, it allows us to easily test our application by sending a request through Postman and verifying the results.

Bulk Request handling in Elastic search using bulk API java example github


Post a Comment

Previous Post Next Post

Recent Posts

Facebook