Latency numbers every programmer should know (with Humanized Scale): Gist
Latency numbers for 2020s: ByteByteGo Video
Estimation Parameters:
Example: Video
Approximation:
1 million req/day = 12 req/sec
Also known as Throttling. Client-side is susceptible to forging, server-side is preferred with a reverse proxy (rate limiting middleware) before API server.
We need to implement throttling per API endpoint per User depending on the design.
Performed in Networks too as part of Congestion Control (use IPTables on Network Layer to set quota for IP Addresses).
Saves from DOS attack, reduces load, saves costs and bandwidth.
Token Bucket
- instead of refilling buckets of millions of users (write heavy) acc to refill rate, refill only upon the next request for a single user based on the time diff since last request for that user (smart)
- accumulates tokens only upto the bucket size, then reject requests
- bucket size, refill rate
"user_1": {"tokens": 1, "ts": 1490868000}
Leaky Bucket
- requests are put into a FIFO queue, and subsequently picked from there based on the outflow rate parameter
- bucket size, outflow rate
Fixed window counter
- create a new counter (having current window start time as ts) upon a request for a user, increment it on every subsequent request in the same window or reject requests if they're more than the window size; remove key-value entries at each window end to ensure that stale ones don't stick around forever
- window duration, window size
- we can entirely skip ts in key but we'll need that in Sliding Window Counter approach below
"user_1_1490868000": 1
Sliding window log
- store a log (in a sorted set) for every incoming request, discard outdated logs (log_ts < current_ts - window_duration), check if log size is less than our window size limit and reject if requests are more than window size
- window duration, window size
"user_1" : {
1490868000
1490868010
1490868020
}
NOTE: The approach described in most sources including the book has a major caveat. If requests keep coming in, the system won’t be able to process any of them after a while since rejected requests will keep filling up log for the rolling window. Store logs only for successful requests to avoid this. Reference
Sliding window counter
- store counter for each fixed-sized window as well as logs
- for an incoming request increment counter for the corresponding fixed-window ts, check by adding all counters in the current log, allow if below limit
- remove all fixed-window counter logs after each window duration
- fixed-window duration (smaller), window duration, window size
"user_1" : {
1490868000: 1,
1490868010: 2
}
Reference: https://www.figma.com/blog/an-alternative-approach-to-rate-limiting
Return HTTP response code 429 - Too Many Requests
. Also, return response headers (like X-Ratelimit-RetryAfter
) to let the user know they are being throttled, and how much time they need to wait (backoff/cooldown/retry after).
Hard Limit: stop exactly after a given number of requests.
Soft Limit: less strict enforcement, continues to process some additional requests even after the threshold has been crossed.
In a distributed environment, single rate limiter cache (counter, bucket, etc…) can face Race Condition. Sorted sets in Redis can resolve this as its operations are atomic.
If we use multiple rate limiters, we need Synchronization so that both the rate limiter know the current value of counter for a given user, to use them interchangeably. Use a shared cache used by both the rate limiters.
Eficiently choosing a server for our request in a distributed and scalable environment. Used wherever we need to choose a server amongst multiple i.e. choosing partitions, load balancing, etc.
Ex - Amazon DynamoDB, Apache Cassandra, Akamai CDN, nearly all Load Balancers, etc.
Use a uniformly distributed hash function (hash
), hash request keys (key
), and perform modulo on the output with the server pool size (N
) to route to a server.
serverIndex = hash(key) % N
It can uniformly distribute requests across existing servers but scaling is an issue - if servers are added or removed, all requests need to be remapped with a new N
and we lose cached data for all requests (cache miss) since we reassign servers to all requests.
In case of addition or removal of servers, we shouldn’t need to remap all they keys but only a subset of them - 1/N
th portion of hash space.
Approach: Hash requests based on some key and hash servers based on IP using the same hash function (hash
). Ex - use SHA-1
for 0
to (2^160)-1
possible hashes. This ensures equal hash space for both requests and servers. And we don’t need to perform modulo operation here. No change in architecture is required here unlike classical hashing which required change to server pool size (N
) parameter upon removal or addition of servers.
For each incoming request, if a server is not available for the same hash, probe clockwise (Linear or Binary Search) and use whatever server is encountered first. Each server is now responsible of handling requests of a particular range of hashes from the ring (partition).
Addition and removal of servers won’t bother us now since probe will take care of finding servers.
Find affected keys - if a server is added/removed, move anti-clockwise from that server and all keys till the first server is encountered will have to be remapped.
Non-uniform distribution - some servers may get the majority of the traffic (hotspots), while others sit idle based on their position on the hash ring.
If we’re probing clockwise, there might be a server that is closer but on the anti-clockwise direction of the hash, we place virtual redundant Virtual Nodes (aka Replicas) to resolve this.
Virtual nodes route to an actual server on the ring, and they need not be adjacent servers of the virtual node.
Place as many virtual nodes across hash space such that response time of a request is minimized (directly proportional to the nearest node it can connect to). If multiple servers go down in sequence, then we can jump ahead using Virtual Nodes.
Cascading failures: subsequent servers crash if one gets swamped with requests.
Hotspots: non-uniform distribution of nodes and data.
Memory costs and operational complexity increase in general with Consistent Hashing.
Data Partition - Consistent Hashing (route requests to partitions based on key's hash space they handle)
Data Replication - Consistent Hashing (copy data onto the next three unique servers towards the clockwise dir)
Consistency - Quorum Concensus
Inconsistency Resolution - Versioning (Vector Clock)
Failure Detection - Gossip Protocol
Handling Temporary Failures - Sloppy Quorum with Hinted Handoff
Handling Permanent Failures - Anti-Entropy Protocol with Merkle Tree
Fast Lookup - Bloom Filter
Ensures data read and write consistency across replicas.
Approach: Client sends request to a coordinator node, all replicas are connected to a coordinator. We need atleast W
replicas to perform and acknowledge a write operation, and atleast R
replicas to perform and acknowledge a read operation for it to be declared successful, where N
is the number of nodes that store replicas of a particular data (and not the total number of replica nodes in the system).
Configuration:
If R = 1, the system is optimized for a fast read
If W = 1, the system is optimized for fast write
If W + R > N, strong consistency is guaranteed (guranteed that there is always atleast one overlapping node in reads and writes)
If W + R <= N, strong consistency is not guaranteed
In case of a read where we get diff values of the same data object from diff replicas, we use versioning to differentiate them. Ex - (data, timestamp)
, we’ll keep the data with the latest timestamp.
We keep a Vector Clock D([Si, Vi])
for each data item in the database and by comparing two clocks we can identify if a data item’s versions precedes, succeeds or in conflict, where Si
is the server writing data item D
and updating its version to Vi
(always increases with every write).
Vector Clock = D([Si, Vi])
Example - D([S1, V1], [S2, V2], ..., [Sn, Vn]) - this represents servers that've acted upon on a data item (D) and version changes it went through
Comparing Clocks: first compare server and then version for both; corresponding servers should be same; versions should be increasing, or decreasing, or constant across all corresponding servers for both versions of the clock to eastablish a chronology, never mixed
D([Sx, 1]) -- parent (happened first chronologically)
D([Sx, 2]) -- child (happened later chronologically, in the next consecutive write)
D([Sx, 1]) -- parent
D([Sx, 1], [Sy, 1]) -- child
D([Sx, 1], [Sy, 2])
D([Sx, 2], [Sy, 1]) -- conflict; how can Sy's version decrease when Sx is clearly indicating succeeding clock (mixed)
D([Sx, 1], [Sy, 2])
D([Sx, 1], [Sy, 1]) -- this happend first (parent)
D([Sx, 1], [Sy, 1])
D([Sx, 1], [Sz, 1]) -- conflict; Sy and Sz both unaware of the other's write (parallel write; siblings)
D([Sx, 1], [Sy, 1], [Sz, 1]) -- after resolving above conflict
Storing vector clocks along with data objects is an overhead. If they get too longr, just delete record of old writes (i.e. leftwards clock entries).
Once a server detects that another server is down, it propagates info to other servers and they check their membership list to confirm that it is indeed down. They mark it as down and propagate the info to other servers.
node membership list - memberID, heartbeat counter, last received
If a server’s heartbeat counter has not increased for more than a predefined period, it is considered as offline.
Used in P2P systems like BitTorrent
protocol.
Instead of enforcing the quorum requirement of choosing from N
nodes that store replica of a particular data (and not the total number of replica nodes in the system), the system chooses the first W
healthy nodes for writes and first R
healthy nodes for reads on the hash ring. Offline servers are ignored.
When the down server goes up, changes will be pushed back to the replicas which are supposed to store that data to achieve data consistency - Hinted Handoff.
Even if W + R > N, strong consistency can't be guaranteed in case of Sloppy Quorums
since with N = 3 (A, B, C), we wrote to nodes A, C and read from D, E because of down servers
no overlap even if the condition was satisfied
hinted handoff will add data to B once its up
Enabled by default in DynamoDB and Riak.
Keep replicas in sync periodically so that if one of them goes down, others have the data we need.
Anti-entropy involves comparing each piece of data on replicas and updating each replica to the newest version. A Merkle tree is used for inconsistency detection and minimizing the amount of data transferred.
A Hash tree or Merkle tree is a tree in which every non-leaf node contains a hash of its child nodes' hashes and actual replica contents as leaves. Hash trees allow efficient and secure verification of the contents of large data structures by localizing mismatched part of data.
Used in version control systems like Git
too.
Bloom filter is a space-efficient probabilistic data structure to check set membership. Often Bit array data structure is used for impl.
Answers the question “Is this item in the set?” with either a confident “no” or a nervous “probably yes”. False positives are possible, therefore it is probabilistic.
We can check the database (expensive operation though) after bloom filter has answered with a “probably yes”.
Can’t remove an item from the Bloom Filter, it never forgets. Removal isn’t possible since entries to the table for multiple items may overlap.
Choose all hash functions with equal hash space. Or more practically, perform modulo N
on the resulting hash, where N
is the filter size (i.e. number of buckets).
k
hash functions and perform hashing of item with each, modulo each resultant hash with N
, and set the corresponding bucket in the filter.N
, and check if the corresponding buckets in the filter are set. If they’re all set, then item “may exist”, otherwise it “definitely doesn’t exist”.The larger the Bloom Filter, the lesser false positives it’ll give. Memory vs accuracy trade-off.
Algorithmic Complexity - O(k)
TC and O(1)
SC (both constants).
Challenge: generating unique IDs in a distributed environment.
Multi-Master Replication - use SQL auto_increment on two servers
UUID - generate UUID anywhere (scales out really well)
Ticket Server - a server that can output unique IDs based on SQL auto_increment, uses single row
Twitter Snowflake ID - a new format for unique IDs (use generator servers to produce; scales out well)
Instagram's Approach - snowflake like IDs
Multi-Master Replication: uses the database auto_increment
feature. Instead of increasing the next ID by 1, we increase it by k
, where k
is the number of database servers in use. This is done to divide generated ID space across servers.
Ex - Keep two servers to generate even and odd ID respectively, and round robin between the two servers to load balance and deal with down time. Not much scalable.
Server1:
auto-increment-increment = 2
auto-increment-offset = 1
Server2:
auto-increment-increment = 2
auto-increment-offset = 2
Server1 generates ID - 1, 3, 5 ...
Server2 generates ID - 2, 4, 6 ...
Cons: not scalable or customizable.
UUID (Universally Unique Identifier): 128-bit standard unique IDs that require no coordination when generating. Every UUID ever generated anywhere is guranteed to be unique!
“after generating 1 billion UUIDs every second for approximately 86 years would the probability of creating a single duplicate reach 50%” - Wikipedia
Standard Representation => lower case Hex groups separated by hyphen = 8-4-4-4-12
Ex - 550e8400-e29b-41d4-a716-446655440000
Cons: super scalable, but not customizable (UUID have a fixed standardized format).
Ticket Server: ab(using) auto_increment
feature of MySQL database and REPLACE INTO
query to generate ID using a single row. Keep multiple tables like Tickets32
and Tickets64
for varying size IDs. Keep two servers in a multi-master fashion described above to ensure high availability.
Cons: SPOF, not really scalable or customizable.
Snowflake ID: 64-bit binary ID written as decimal. Guaranteed to be unique since it stores timestamp and machine and datacenter info. sequence_number
is almost always 0
but if we generate multiple IDs in under 1ms, it is incremented to help us differentiate between them.
snowflake_id = timestamp + datacenter_id + machine_id + sequence_number
Ex - 1541815603606036480
Used at Twitter, Discord, Instagram, and Mastodon.
Instagram’s Approach: uses snowflake like 64-bit ID. Uses Postgres logical shards feature.
id = ts + logical_shard_id + table_auto_increment_seq_value
This Primary ID will be unique across all tables in the application (and as a bonus contains the shard ID).
Specify HTTP response code along with a Location
header. Either send back 301 - Moved Permanently
(browser caches response and all subsequent requests go to new URL) or 302 - Found
(moved temporarily, subsequent requests keep going to short URL first, better for analytics). The client will redirect automatically to the URL specified in the location header.
It redirects automatically since its standard HTTP! The response with the code 301 doesn’t even show up in Postman (since it supports redirection by default). Eliminates the need for any redirect logic implementation on the client.
API Design
-----------
POST /api/v1/shorten
Send long URL in JSON Body, get short URL back
GET /api/v1/longUrl
Send short URL in body or as query param, get long URL back (API use case only)
GET /{short_code}
Redirects using the HTTP status code and response header (main use case)
Design Pointers:
SHA-1
, long fixed size hash but we take only a fragment of it which increases collision probability.base62
encoding: encodes to 0-9a-zA-Z
(total of 62 character mappings), long variable sized output string but we encode the corresponding unique id
of the long URL entry in data store, becomes collision-free and meaningless to snoopers. The only caveat is that it can be reverse engg to find next id
(next URL) since it is an encoding (two-way).62^7 ~ 3.5 trillion
URLs.URL_table(id, shortURL, longUrl)
.Crawl in a BFS fashion, because DFS can be infinitely deep.
URL Frontier Internals:
[Prioritizer]
|
diff queues for diff priority sites (front queues)
|
[Queue Router]
|
diff queues for each domain (back queues; for politeness)
|
[Queue Selector]
| | |
worker1 worker2 worker3
Optimizations:
robots.txt
, and beware of spider traps by limiting crawl upto certain depthWe send request to Third party Push Notification, SMS, or Email servers and they send out message to clients.
Microservices [S1 ... Sn]
|
Notification Server (auth, rate limit) + Cache + DB
|
Queues (one for each - Android, iOS, SMS, Email)
|
Workers (pull messages from queue and send to third-party, retry mechanism, maintain log, templating)
|
Third Party Service
|
Client
Analytics Service (receives events from Notif server the client; to track message delivery and actions by user)
In a distributed system, it is impossible to ensure exactly-once delivery (Two Generals Problem). Since there is no ACK from the Third-Party service that the message has reached the Client or not, it is acceptable to send the same message multiple times. To reduce such duplication, maintain a log (on the Worker) to know if the message has been seen before.
For each user, maintain a temporary (ephemeral) News Feed Cache and we can write to our friends' feed cache, or others can read from our feed cache. Both may use a Fanout Service to do so.
Fanout: one-to-many spread of data
Steps for Push Model:
user_id
and post_id
pairs to News Feed Caches. Fetch friends list using a Graph DB, and user details from User DB to filter (user blocklists, etc).post_id
and user_id
in the pair from respectice DBs.Choose a hybrid model. Celebrity posts are delivered via pull model, for normal accounts implement push model (since posts and followers are far less).
Post Service
Post Cache + Post DB
Fanout Service (and its workers) (filters users and content)
Notification Service
User Relation DB (graph datastore)
User Cache + UserDB
Message Queue(s)
News Feed Service (used only during build feed)
News Feed Cache (stores only <user_id> to <post_id> pairs to save space; get data from UserDB and PostDB)
Both users connect to chat servers via WebSocket Protocol, and servers are synced with each other using MQs in between.
Types:
message_id
)message_id
and a channel_id
)When a new message is to be delivered to client, how can the chat server send us that? Since the connection is initiated by the client and are supposed to be short-lived?
Three types of components:
Storage:
Service Discovery - to find our API Server a nearest chat server (Apache Zookeper)
Message Sync Queues - One MQ per receiver: WeChat uses this, not so scalable for Group Chat
KV Store - stores messages (especially when user is offline)
Presence Servers - User sends a heartbeat to PS at regular intervals, if timeout happens and we didn't receive it, then user's status changes, fanout status to other users and store last_seen in datastore
Sync Devices of a Single User - store latest message_id in a datastore on chat server shared between both devices
Ping with GET request multiple times each second as the user is typing.
search?q=di
search?q=din
search?q=dinn
search?q=dinne
search?q=dinner
For small data sets, use SQL relation Query(word, freq)
and get top k matches using LIKE
clause ordered by freq for each partial search word.
For large data sets, get k most searched queries using Trie data structure for fast retrieval. Build tries periodically with query data from the Aggregator Service and store in Trie DB with a Trie Cache in between. Store Trie across multiple shards to scale storage.
Trie TC = O(p) + O(c) + O(c log c)
Optimization:
Optimized Trie TC = O(1)
Original Storage - store original quality videos
CDN - store transcoded videos as BLOB (Binary Large OBjects)
API Servers - everything else
Upload: transcode video with Transcoding servers and parallely save metadata to DB (use Completion Queue and Completion Handler to check if transcoding was successful), send to CDN worldwide (push model).
Video must be quickly playable on all kinds of devices, with seek (random access). Trancode it, save diff bitrate (for adaptive bitrate), and save diff resolutions.
Transcoding - video container (.mp4, .webm) and compression codec (H.264, VP9, HEVC), split video in sections (GOP alignment), split video into channels (video, audio, captions, watermark) (use DAG), save diff resolutions of the same video
DAG: we can configure and parallelize steps in transcoding by defining a DAG, combine them all at the end. Use DAG scheduler to put task messages in the queue.
Use MQs between every component to make entire pipeline async. Get a “pre-signed” URL from Amazon S3 or Azure Blob Storage using API, and use that URL to upload video files to them.
DRM - Google Widevine
AES Encryption - encrypt video during upload, decrypt during playback
Long-Tail Distribution - store most popular videos in CDN, videos with very less views in Video Servers
Stream: download in chunks using streaming protocols like MPEG-DASH (Dynamic Adaptive Streaming over HTTP), Apple HLS (HTTP Live Streaming), or WebRTC. The resolution and bitrate may change frequently based on the client’s network bandwidth.
Enhancements - live streaming (real-time video transcoding) and content filtering.
References:
Needs sync, so strong consistency is expected, use a relational database (ACID out-of-the-box) to store file metadata.
Use third-party large storage service like Amazon Simple Storage Service (S3). Provides automatic same-region and cross-region replication, sharding, etc.
Upload: start uploading file to block servers and parallely save file metadata to DB. Upon completion of upload, the storage service sends callback request indicating upload completion to API service and it updates file upload status to completed
and triggers notification to user using Notification service.
Edit/Download: notify all online users to pull latest data using Notification server, clients can then fetch fresh metadata and block servers start downloading file blocks, perform delta sync and then send new blocks to the client where they’re combined into actual files using data from metadata DB.
Allow resumable uploads ?uploadType=resumable
to recover from network failure during uploads.
Block Servers - split files into blocks before uploading to S3, compress (gzip, bzip2) and encrypt each block, store block metadata (block_id, hash, block_order) in Metadata DB to avoid duplicates, etc. Downloads happen to block servers first too instead of directly downloading onto the client for delta sync.
Delta Sync - on file modification, only modified blocks are synced using a sync algorithm e.g. Differential Synchronization.
Notification service is used for sync - accessed by other online clients via long polling. Send notifs to online clients upon file edits or new file creation. If a client is offline, save file changes in cache or offline backup MQ to replay when it becomes active.
Optimizations:
Links: