Deposit Queue Design

Deposit Queue Design

Tags

Overview

This document describes aspects and supporting components of the CS Queue Server architecture and implementation. This service is generally responsible for the automated batch processing of Metadata deposits, and user queries.

The following diagram depicts the main Java components and the general flow between them. Numbered aspects of the diagram are annotated below the diagram.

  1. A depositor posts a deposit metadata XML file to an admin service URL over HTTPs. This post can contain a user identifier and password.
  2. The post is recognized and various parts of the request are parsed out into a submission request object.
  3. The upload controller will query the queue server to obtain the number of items the depositor currently has enqueued, and if it exceeds a certain limit, will return a 429.
  4. Assuming the submission passes initial checks with the controller, the upload request object is passed off to an upload coordinator instance. Note that the submission request can be either for depositing metadata, or for posting a user query.
  5. The upload coordinator will authorize the request with the help of an authorization service.
  6. If the request is authorized the coordinator will pass the submission request along to an upload submission service object.
  7. The upload service object will create a unique, generically named metadata file containing the particulars of the submission request, and place that file in a generic working upload folder for the specified user (based on their identifying info on the URL), e.g. /upload-files/crossref/submissions/L/. At this point, a response is returned to the original caller indicating that the request has been received.
  8. Meanwhile, there is a singleton “file transfer” object loaded into the spring context which polls for files placed in the generic root folder (above). This object will not perform the polling of the directory if the admin option to allow upload notifications is disabled.
  9. When a file is found in the previous step, it is first passed on to a “factory” object.
  10. The factory object does two basic things: 1) stores the original content of the post on S3, and 2) stores a record in the database capturing details of the post.
  11. Once the factory instance has completed its work in the prior step, the file transfer object will invoke a method in the queue server instance which adds the submission to the queue server by way of the database id of the submission db record.

    Within that call, the id is used to fetch the submission object from the database, and it is this object that is then passed to the addSubmission call of the specific internal queue instance, either the metadata queue, or the query queue. The type of submission determines whether the item to be processed gets added to one or the other.

    Internally, the queue implementation manages submissions by user, so when asked to add a submission in this step, internally, the queue object will convert the submission object to a “user submission” object (SubmissionQueueImpl.User), and caches this object in an internal list. This object basically identifies the user and maintains a double-ended-queue containing the users pending submissions.

  1. There is another singleton instance loaded into the spring context which acts as a container for managing processing threads. This singleton instance is injected with an instance of the queue server when loaded into the spring context.

As this object is asked to create new processing threads, it in turn injects an instance of the queue server into the processing threads so that they can in turn poll the queue server per the next step.

  1. The individual threads, which are started within the thread manager, poll the queue server for submissions that are targeted to the thread. I.e. each processing thread has properties that are used to filter items in the queue so that the thread only receives items meant for it. So a processing thread will “grab” the next matching item from the queue and process it.

The queue server will not yield any submission ids to process if the queue server is suspended, in which case submission will continue to accumulate within the queue server. But assuming the queue is not suspended, the following describes how the queue server selects a suitable submission for a thread to work on.

A processing thread obtains a single submission to work on via the queue server’s grabSubmission method. This call accepts the following arguments:

  • submissionTypes - list of submission types the thread handles - typically 1 entry, metadata or query
  • maxSubmissionSize - max submissions identified by thread
  • spid - thread identifier (generated by thread manager)
  • users - a space separated list of “remote” user names to consider for selection. If none are specified, then any user’s submission may be selected
  • excludeUsers - indicates whether the given users are to be excluded, as opposed to included.
  • lowPriorityUsers - this input is ignored
  • preferredUsers - a space separated list of “remote” user names considered to have high priority

The logic for selecting a submission based on the above inputs is as follows:

  1. Determine which of the two internal queues, metadata or query, to grab from. This is done by examining the list of given submission types. If the list contains the metadata type, the metadata queue is selected. Otherwise, if the list contains the query type, the query queue is selected, otherwise nothing is selected and nothing happens.

  2. From the given inputs, three sets of identifiers are calculated per request:

    1. The set of ids for users whose submissions will be included for selection - parsed from the given users string, assuming exclude users is false, or empty set otherwise.
    2. The set of ids for users whose submissions will be excluded from selection - parsed from the given users string, assuming exclude users is true, or empty set otherwise.
    3. The set of ids for users who are to be considered preferred - parsed from the given preferred user ids.
  3. Furthermore, b and c are subsequently modified by taking into account a list of user ids that are prohibited from the the type of round-robin selection employed by this logic. This set of ids is maintained globally within the queue server and can be initialized on app startup, and/or dynamically set at runtime. Regardless, any user ids in this set will be removed from c, and added to d.

    The three sets of ids from step #2 are passed to the queue object determined in step #1 via its fetchSubmission method. These sets of ids, in combination with global settings maintained within the queue object itself, are then used to determine a selection from the queue.

    Step #11 mentioned that the queue manages a list of “user submission” objects. The list grows and shrinks as items are put on by step #11 and and pulled off by step #13. The growing, shrinking, and accessing of the list is performed by public methods provided by the queue implementation.

    Because the same queue is accessed by multiple thread processors, all operations on it are synchronized, e.g. two threads cannot simultaneously add a submission to the queue, although the operation is fast, so wait is minimal. The public fetchSubmission method is not declared as synchronized, but internally, it is synchronizing access, albeit in a more fine grained manner.

    Another aspect of the queue implementation is that it maintains an internal “cursor”. This is the index of the last user submission object being worked on from the main list. Given the inputs described above, and the notion of a “cursor” previously mentioned, selection logic for choosing a submission is roughly as follows.

    If required user IDs were specified, pull out the first user submission from the queue whose user ID matches one of the given required IDs, and for which the user is under their allotted allocation. If no items in the queue satisfy these criteria, don’t return anything - this request to fetch a submission will be ignored.

    If no required IDs were specified, but a set of preferredIDs was given, pull out the first user submission from the queue that matches any of the preferred IDs, similar to the above, but start looking at the current cursor location. If no items in the queue satisfy these criteria, move on to the next check.

    If the logic has made it this far, the next submission for the user submission object located at the current cursor will be removed from the queue and returned.

Deployment

The queue server process is currently deployed in the same manner as all CS instances in the DataCenter, and is currently running on the VM called cr8, also referred to as qserver.crossref.org. Its deployment number is #9.

Configuration

The queue configuration is mostly defined in the context files and the deployment properties WEB-INF/deployments/deployment-architype-queue.properties and WEB-INF/deployments/deployment-cddb2-9.properties.

Some configurations can be dynamically reconfigured via JMX at qserver.crossref.org:8091

There are 3 parts to the queue that can be configured:

  1. The queue server itself.
  2. The metadata submissions queue.
  3. The queue submissions queue.

The configurations that will most likely change on an as needed basis are user allocations and user concurrency for each of the two submissions queues.

The submissions are grouped by user and the users are organized into a ring (so the first user links to the second user and the last user links to the first user). Submissions are then distributed to Submission Processors (SPs) so that each user has one submission processed by the next SP. That is, we are using round-robin allocation of submissions by user to SPs. Each user gets one submission allocated to an SP per round-robin cycle. This allocation can be changed for all users and by specific users.

Each user is also given a maximum, system wide deposit concurrency. This ensures that no one user will have submissions being deposited on all the SPs. Concurrency affects both round-robin and required user submission distribution. The JMX mbeans are:

  • qs.queueserver:name=queueServer
  • qs.queueserver:name=metadataSubmissionQueueManager
  • qs.queueserver:name=querySubmissionQueueManager

Concurrency and Allocation

The default concurrency and allocations are set via metadataSubmissionQueueManager and querySubmissionQueueManager mbean attributes:

  • DefaultConcurrency
  • DefaultAllocation

The user specific concurrency and allocations are set via metadataSubmissionQueueManager and querySubmissionQueueManager mbean operations

  • assignUsersDefaultConcurrencies
  • assignUsersDefaultAllocations Both of these operations take space (or comma) delimited list of user names and a specific integer value. Setting concurrency to 0 will effectively prohibit the user’s submissions from being processes. Setting the allocations to 0 will effectively exclude the user from round-robin submission distribution.

Prohibited Users

It is sometimes needed to prohibit one or more users’s submissions from being processed or to restrict one or more users’s submissions to being processed by specific SPs. These users are specified using the queueServer mbean’s ProhibitedUsers attribute. For example: creftest, taylor, springer

For these users an SP must require the user to be allocated a user’s submission. Note that this configuration is at the queue server and so affects both metadata and query queues. So, for example, if you make creftest a prohibited user you would need to create a creftest specific metadata SP and a creftest specific query SP.

Service Processor Configuration

There are 2 kinds of Service Processors (SPs)

  • Metadata Deposit SPs
  • Batch Query SPs

The SPs run on 4 DS machines,

  • DS2-1 runs batch queries.
  • DS3-1, DS4-1, and DS5-1 run metadata deposits
  • DS3-2, DS4-2, and DS5-2 run synchronous deposits (and so don’t use the queue)

Any metadata and the query SP are configured with a submission selection criteria. The criteria are

  • Required users
  • Prohibited users, and
  • Preferred users

The SPs are normally configured and started using the HTTP interface to JMX. The SPs can also be configured and started using the mbeans

  • qs:name=dataSubmissionProcessorManager
  • qs:name=querySubmissionProcessorManager

The input fields “userFilter” and “excludeUsers” are used in conjunction to set the required or prohibited criteria. User filters is a comma delimited list of user names. When exclude users flag is false then the users are required, while when it is true the users are prohibited. (Note that this logic is backwards from common expectations regarding a boolean flag.) The preferred users is a list of users that should be given priority to the SP. If no preferred user has a pending submission to deposit then the next, non-prohibited, round-robin user’s submission is given.

“Parking” Submissions

This refers to updating submission records in the database so that they will not be picked up and processed. This simply involves setting the start and finish dates on records in the SUBMISSIONS_NEW table to the same value. Here is a sample query that does this for a range of submissions recieved within a specified time range:

update SUBMISSIONS_NEW set started=to_date(‘2019-06-24 10:00:00’, ‘YYYY-MM-DD HH24:MI:SS’), finished=to_date(‘2019-06-24 10:00:00’, ‘YYYY-MM-DD HH24:MI:SS’) where received between to_date(‘2019-06-24 06:00:00’,‘YYYY-MM-DD HH:MI:SS’) and to_date(‘2019-06-24 10:00:00’,‘YYYY-MM-DD HH:MI:SS’) and finished is null and started is null;