Description: https://images.manning.com/360/480/resize/book/b/74ea4b7-da90-40ab-a2e8-52a72daecdb4/Gruber-MEAP-HI.png

From Rust Web Development by Bastian Gruber

This article discusses how to update our data thread in Rust safely.


Take 35% off Rust Web Development by entering fccgruber into the discount code box at checkout at manning.com.


When operating an asynchronous web server, we must be aware that thousands (or more) requests can come in at the same second, and every request wants to write or read data. We have a single data structure which provides us with the state in our application, but what happens when two or more requests want to write to the same structure or read from it?

We must give access to our store to each request separately and notify the other process or request to wait until the previous read or write is done on the Store. In this case, two processes (or more) want to update the same data structure. We need to put other processes on a waiting list to process one at a time to alter our data.

In addition, Rust has a unique view of ownership. One instance or process can have ownership over a particular variable or object. This is to prevent race conditions and null pointers, where data is referenced which is no longer there. It seems we have to wait for one request to finish, for it to be able to return the ownership of the Store to the next one. This runs completely counter to the asynchronous mindset.

We’re facing two problems:

  • We want to prevent that two or more processes alter data at the same time.
  • We want to give each route handler ownership over the data store if needed to mutate it.

Before we can even think about having waiting lists on our Store to alter data, we first need to make sure Rust can share the ownership of a state. Let’s tackle the second problem first. Rust moves ownership when passing variables around in our code. Figure 1 shows how, when passing a complex value (like a String) to another variable, it’s dropping the first one to make sure only one pointer on the stack has ownership over this structure on the heap, which controls all modifications to it.


Figure 1: Re-assigning a complex data type like a String to another variable is internally moving ownership over to the new variable and dropping the old one.


This concept is a problem for us right now. Rusts’ safety measurements are preventing us from sharing data between different functions and threads, because whenever we pass a value to a new function, we transfer the ownership over this value and have to have wait until we get it back. Two options come to mind:

  • Create a copy of our store for each route handler
  • Wait until one route handler is finished to give the ownership of the store back and pass it on to the next one

Neither address the underlying issue in an appropriate way. The first option pollutes our memory and we still can’t mutate the data inside the store, and the second option works counter to our asynchronous approach.

Lucky for us, Rust comes equipped to deal with these problems. Specifically:

  • Rc<T>
  • Arc<T>

The Rc or Arc type places the underlying data structure T on the heap and creates a pointer on the stack. Now you can make a copy of that pointer which reference the same data. The difference between these two is that Rc only works on single-threaded systems and Arc on multi-threaded, hence it lets you share data between different threads.


Figure 2: Instead of dropping the value x, Rust is incrementing the Arc count. Whenever x or y are going out of scope. It’ decreases the count until it’s at zero, and then calls .drop() to be removed from the heap.


The Arc type is “Atomically Reference Counted”. It’s like a container, which moves the wrapped data in it onto the heap and creates a pointer to it on the stack. When cloning an Arc, you clone the pointer which points to the same data structure on the heap, and internally, Arc increments its count. When the internal count reaches zero (when all variables pointing to the variables go out of scope), Arc drops the value. This makes it safe to share complex data on the heap between different variables.

We run on top of tokio, which means we need to use Arc<T> and wrap our data store in it, but this is only one part of the solution. Reading to the same Store is fine, but we also want the chance to mutate it. A HTTP POST request on one thread can add questions, and a HTTP PUT request on another thread can try to alter an existing one. Therefore, we need to look for solutions. Rust also has us covered in this scenario. We can use either of these two types:

  • Mutex
  • RwLock

They both make sure that a reader or writer as unique access to the underlying data. They lock the data as soon as a writer or reader wants access and unlock it for the next reader or writer when the previous one is finished. The difference is: A Mutex is blocking for either a writer or reader, whereas a RwLock allows as many readers simultaneously but one writer at a time.

We must be cautions: both types are behind the std::sync module, which indicates that they aren’t best in an async environment like we have. Some implementations of the RwLock type for an async environment are added to our project.

We choose the library parking_lot, which is heavily used in productive environments in larger corporations and has therefore our trust for now. We add the library to our Cargo.toml file:

Listing 1: Adding parking_lot to our project

 
 …
 [dependencies]
 warp = "0.3"
 parking_lot = "0.10.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 tokio = { version = "1.1.1", features = ["full"] }
  

Now with everything in place, we can start to update our code base accordingly. First, we encapsulate our questions in an Arc, to place the data onto the heap and get multiple pointers to it. In addition, we wrap our questions structure in a RwLock, to prevent from multiple writes at the same time.

Listing 2: Making our HashMap thread safe

 
 …
  
 use std::sync::Arc;
 use parking_lot::RwLock;
  
 …
  
 #[derive(Clone)]
 struct Store {
     questions: Arc<RwLock<HashMap<QuestionId, Question>>>,
 }
 …
  

We must update the way we read questions from our Store in the get_questions function as well:

Listing 3: Adjusting our way of reading the store

 
 …
 async fn get_questions(store: Store) -> Result<impl warp::Reply, warp::Rejection> {
     let res: Vec<Question> = store.questions.read().values().cloned().collect();
  
     Ok(warp::reply::json(&res))
 }
 …
  

A simple .read() on the questions is enough to request reading from the RwLock. With the updated wrapping of your Store structure in mind, we create two new functions: updating and inserting questions.

Adding a question

We solved our problem of handling state in a thread-safe manner, and now we can go ahead and implement the rest our API routes and explore how we parse bodies from a HTTP request and read parameters from a URL. The first route we add is accepting HTTP POST requests to the /questions path.


Figure 3: We expect new questions in the body of the HTTP POST request on the /questions path.


Listing 4 shows our add_question route handler. We expect the store being passed to our function, and a question. We then can make use of the RwLock we implemented on the Store, and use the method .write() to request write access to it. Whenever we get it, we have access to the underlying HashMap and call .insert with our question.

Listing 4: Adding route handler for adding a question to the store

 
 …
  
 async fn add_question(store: Store, question: Question) -> Result<impl warp::Reply, warp::Rejection> {
     store.questions.write().insert(question.clone().id, question);
  
     Ok(warp::reply::with_status(
         "Question added",
         StatusCode::OK,
     ))
 }
  
 …
  

The insert method takes two arguments: The index for the HashMap and the value we want to store next to it. We can spot Rusts ownership principles here as well: We access the id of the question in the first parameter, and therefore pass the ownership of the question to the .insert method of the HashMap. This would be fine if we wouldn’t use the question anywhere else again, but the second argument takes the question and stores it in the HashMap.

Therefore we .clone the question in the first parameter to create a copy, and then give ownership over the initial question from the function parameters to the .insert method.

Listing 5: Adding the POST route for /questions

 
 …
  
 #[tokio::main]
 async fn main() {
     …
  
     let get_questions = warp::get()
         .and(warp::path("questions"))
         .and(warp::path::end())
         .and(store_filter.clone())
         .and_then(get_questions);
         .and_then(update_question);
  
     let add_question = warp::post() #A
         .and(warp::path("questions")) #B
         .and(warp::path::end()) #C
         .and(store_filter.clone()) #E
         .and(warp::body::json()) #F
         .and_then(add_question); #G
  
     let routes = get_questions.or(add_question).with(cors).recover(return_error);         warp::serve(routes)          .run(([127, 0, 0, 1], 3030))          .await;  }

#A We create a new variable and use warp::post() this time to filter HTTP POST requests

#B We still listen on the same root path, /questions

#C We close the path definition

#E We add our store to this route to pass it to the route handler later

#F We extract the JSON body, which is getting added to the parameters as well

#G We call add_question with store and the json body as the parameters

We added two new routes and added these to our route variable. Be aware that we removed the individual .recover after the end of the get_questions filter, and added it the end of the routes, because now we try different routes before recovering not found paths.

That’s all for now. Learn more about the book on Manning’s liveBook platform here.