Updated: Oct 23
Scenario: The software performs subledger accounting entries from ledger entries and is offered in SaaS mode for Investment Bank and Hedge Fund clients. Processing in simplified form
extract trades and balances from the ledger system,
transform and perform calculations and
persist them in the sub-ledger database.
Existing software is a combination of perl and java, deployed in a single tenant mode, as one instance per client. The requests come from a scheduler and on-demand from users through the day. The larger requests pull in 100 Million entries on avg from the ledger to process.
The Ask: The software is to be re-designed completely in java, is to be multi-tenant with all clients served through a single cluster. This meant the new version should process the current volume of a few billion records and should also support 10X more for future business. It is expected that a 100M records file is to be processed within 10 minutes in the new version.
The Solution: Below is the suggested design by 91social with details of request processing flow.
The solution is explained below with the choices made from design to coding and regarding hardware and software configurations.
The java processes are designed to be a stateless cluster. Requests come from a durable queue and a distributed lock is used to avoid concurrent processing of an entity across the cluster.
The processing is modeled as a series of steps (like unix pipes), connected via in-memory queues and thread pools, refer to Request Processing Detail Flow. The target is to achieve throughput of 0.5M rps at every step, while keeping memory usage minimum and constant per request. So multiple requests can be processed within the same JVM and then scale across JVMs.
Files are transferred from remote ledger machines to local JVM nodes via SCP as this involves just one network trip between source and destination. Files are stored in S3 for auditing.
Files are compressed at source, to reduce n/w usage as bandwidth is a shared resource, and additional cpu usage is incremental. Compression is at 95% for csv data files.
JVM nodes use local disks rather than network disks for faster localized disk I/O. A processing failure will involve a network file pull but then failures are quite less and an occasional duplicate network pull is justified.
4 local disks are mounted at each jvm to scale disk i/o. Logs are kept to a minimum and written to a separate disk. 3 different CPUs within a similar price range were tested and then chosen.
Springboot is the enterprise preferred framework and used to start long-lived services; processing code does not use DI to avoid overhead of spring proxies when processing millions of records.
Spring batch and other batch frameworks do not parallelize IO to CPU, so the processing steps are custom coded as threads to operate concurrently. The file reader thread gets records and puts them in a local java queue and goes back to reading while they are being processed by the processor thread. This provided a minimum of 2X throughput when benchmarked against other frameworks.
CSV record splitter is custom coded for speed and to keep memory and cpu to the minimum.
The records were processed in mini batches of 100, to minimize java queue operations and associated locks. The batch size was chosen at 100, to keep the total number of records in memory to 400 and ensured each request was able to operate at 150MB. Back pressure was enforced through queues.
Text to BigDecimal object conversion is heavy, so the 100 most used numbers are cloned from cache
JVM is at 2GB and newgen at 3:1 as most objects are shortlived, this achieved > 99% jvm throughput.
Database persistence is achieved through bulk copy rather than sql inserts or jdbc batches. The result data is first written to files capped at 100K records, and multiple connections persist data into db. Each connection persists at 50K rps, so 10 connections provide 500K rps at this step.